From 782ad8fda47c36d8233fa75573b23def67f89e65 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Sun, 16 Feb 2025 18:14:34 +0200 Subject: [PATCH] Impoleent PS communicator --- pgxn/neon/libpagestore.c | 824 +++++++------ pgxn/neon/pagestore_client.h | 76 +- pgxn/neon/pagestore_smgr.c | 2122 ++-------------------------------- 3 files changed, 633 insertions(+), 2389 deletions(-) diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index 22aeb2e2d6..c390c50b4f 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -1,4 +1,4 @@ -/*------------------------------------------------------------------------- +]/*------------------------------------------------------------------------- * * libpagestore.c * Handles network communications with the remote pagestore. @@ -12,6 +12,8 @@ * *------------------------------------------------------------------------- */ +#include + #include "postgres.h" #include "access/xlog.h" @@ -45,6 +47,9 @@ #define MIN_RECONNECT_INTERVAL_USEC 1000 #define MAX_RECONNECT_INTERVAL_USEC 1000000 +#define RECEIVER_RETRY_DELAY 1000000 +#define MAX_REQUEST_SIZE 1024 +#define MAX_PS_QUERY_LENGTH 256 /* GUCs */ char *neon_timeline; @@ -54,57 +59,45 @@ char *page_server_connstring; char *neon_auth_token; int readahead_buffer_size = 128; -int flush_every_n_requests = 8; +int parallel_connections = 10; int neon_protocol_version = 2; -static int max_reconnect_attempts = 60; static int stripe_size; +static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; + +void CommunicatorMain(Datum main_arg) + typedef struct { char connstring[MAX_SHARDS][MAX_PAGESERVER_CONNSTRING_SIZE]; size_t num_shards; } ShardMap; -/* - * PagestoreShmemState is kept in shared memory. It contains the connection - * strings for each shard. - * - * The "neon.pageserver_connstring" GUC is marked with the PGC_SIGHUP option, - * allowing it to be changed using pg_reload_conf(). The control plane can - * update the connection string if the pageserver crashes, is relocated, or - * new shards are added. A parsed copy of the current value of the GUC is kept - * in shared memory, updated by the postmaster, because regular backends don't - * reload the config during query execution, but we might need to re-establish - * the pageserver connection with the new connection string even in the middle - * of a query. - * - * The shared memory copy is protected by a lockless algorithm using two - * atomic counters. The counters allow a backend to quickly check if the value - * has changed since last access, and to detect and retry copying the value if - * the postmaster changes the value concurrently. (Postmaster doesn't have a - * PGPROC entry and therefore cannot use LWLocks.) - */ -typedef struct -{ - pg_atomic_uint64 begin_update_counter; - pg_atomic_uint64 end_update_counter; - ShardMap shard_map; -} PagestoreShmemState; +static ShardMap shard_map; +static PageServer* page_servers; #if PG_VERSION_NUM >= 150000 static shmem_request_hook_type prev_shmem_request_hook = NULL; #endif static shmem_startup_hook_type prev_shmem_startup_hook; -static PagestoreShmemState *pagestore_shared; -static uint64 pagestore_local_counter = 0; + +static NeonCommunicatorResponse* responses; /* for each backend */ +static NeonCommunicatorChannel* channels; + +#if PG_VERSION_NUM < 170000 +int MyProcNumber; +#endif + +static bool am_communicator = false; typedef enum PSConnectionState { PS_Disconnected, /* no connection yet */ PS_Connecting_Startup, /* connection starting up */ - PS_Connecting_PageStream, /* negotiating pagestream */ + PS_Connecting_PageStream, /* negotiating pagestream */ PS_Connected, /* connected, pagestream established */ + PS_Expired, /* cpnnection shopuld be reconnected */ } PSConnectionState; /* This backend's per-shard connections */ @@ -138,16 +131,24 @@ typedef struct WaitEventSet *wes_read; } PageServer; -static PageServer page_servers[MAX_SHARDS]; +static PageServer* page_servers; -static bool pageserver_flush(shardno_t shard_no); -static void pageserver_disconnect(shardno_t shard_no); -static void pageserver_disconnect_shard(shardno_t shard_no); +static bool pageserver_flush(int chan_no); +static void pageserver_disconnect(int chan_no); -static bool -PagestoreShmemIsValid(void) + +static void* communicator_read_loop(void* arg); +static void* communicator_erite_loop(void* arg); + +static void +log_error_message(NeonErrorResponse* err) { - return pagestore_shared && UsedShmemSegAddr; + int save_pid = MyProcPid; + pthread_mutex_lock(&mutex); + MyProcPid = ProcGlobal->allProcs[resp->procno].pid; + neon_log(LOG, "Server returns error for request %d: %s", err->req.tag, err->message); + MyProcPid = save_pid; + pthread_mutex_unlock(&mutex); } /* @@ -218,13 +219,16 @@ static void AssignPageserverConnstring(const char *newval, void *extra) { ShardMap shard_map; + size_t old_num_shards; /* * Only postmaster updates the copy in shared memory. */ - if (!PagestoreShmemIsValid() || IsUnderPostmaster) + if (am_communicator) return; + old_num_shards = shard_map.num_shards; + if (!ParseShardMap(newval, &shard_map)) { /* @@ -234,17 +238,24 @@ AssignPageserverConnstring(const char *newval, void *extra) elog(ERROR, "could not parse shard map"); } - if (memcmp(&pagestore_shared->shard_map, &shard_map, sizeof(ShardMap)) != 0) + for (page_servers == NULL) { - pg_atomic_add_fetch_u64(&pagestore_shared->begin_update_counter, 1); - pg_write_barrier(); - memcpy(&pagestore_shared->shard_map, &shard_map, sizeof(ShardMap)); - pg_write_barrier(); - pg_atomic_add_fetch_u64(&pagestore_shared->end_update_counter, 1); + page_servers = (PageServer*)calloc(NumberOfChannels(), sizeof(PageServer)); } - else + for (size_t i = 0; i < old_num_shards; i++) { - /* no change */ + if (page_servers[i].state = PS_Connected) + { + /* TODO: race condition */ + page_servers[i].state = PS_Expired; + } + } + for (size_t i = old_num_shards; i < shard_map.num_shards; i++) + { + pthread_t reader, writer; + void* chan_no = (void*)i; + pthread_create(&writer, NULL, communicator_write_loop, chan_no); + pthread_create(&reader, NULL, communicator_read_loop, chan_no); } } @@ -254,7 +265,7 @@ AssignPageserverConnstring(const char *newval, void *extra) * * If num_shards_p is not NULL, it is set to the current number of shards. * - * If connstr_p is not NULL, the connection string for 'shard_no' is copied to + * If connstr_p is not NULL, the connection string for 'chan_no' is copied to * it. It must point to a buffer at least MAX_PAGESERVER_CONNSTRING_SIZE bytes * long. * @@ -262,12 +273,12 @@ AssignPageserverConnstring(const char *newval, void *extra) * last call, terminates all existing connections to all pageservers. */ static void -load_shard_map(shardno_t shard_no, char *connstr_p, shardno_t *num_shards_p) +load_shard_map(int chan_no, char *connstr_p, int *num_shards_p) { uint64 begin_update_counter; uint64 end_update_counter; ShardMap *shard_map = &pagestore_shared->shard_map; - shardno_t num_shards; + int num_shards; /* * Postmaster can update the shared memory values concurrently, in which @@ -282,24 +293,24 @@ load_shard_map(shardno_t shard_no, char *connstr_p, shardno_t *num_shards_p) end_update_counter = pg_atomic_read_u64(&pagestore_shared->end_update_counter); num_shards = shard_map->num_shards; - if (connstr_p && shard_no < MAX_SHARDS) - strlcpy(connstr_p, shard_map->connstring[shard_no], MAX_PAGESERVER_CONNSTRING_SIZE); + if (connstr_p && chan_no < MAX_SHARDS) + strlcpy(connstr_p, shard_map->connstring[chan_no], MAX_PAGESERVER_CONNSTRING_SIZE); pg_memory_barrier(); } while (begin_update_counter != end_update_counter || begin_update_counter != pg_atomic_read_u64(&pagestore_shared->begin_update_counter) || end_update_counter != pg_atomic_read_u64(&pagestore_shared->end_update_counter)); - if (connstr_p && shard_no >= num_shards) + if (connstr_p && chan_no >= num_shards) neon_log(ERROR, "Shard %d is greater or equal than number of shards %d", - shard_no, num_shards); + chan_no, num_shards); /* * If any of the connection strings changed, reset all connections. */ if (pagestore_local_counter != end_update_counter) { - for (shardno_t i = 0; i < MAX_SHARDS; i++) + for (int i = 0; i < MAX_SHARDS; i++) { if (page_servers[i].conn) pageserver_disconnect(i); @@ -313,41 +324,53 @@ load_shard_map(shardno_t shard_no, char *connstr_p, shardno_t *num_shards_p) #define MB (1024*1024) -shardno_t -get_shard_number(BufferTag *tag) +int +get_shard_number(NRelFileInfo rinfo, BlockNumber blocknum) { - shardno_t n_shards; uint32 hash; - load_shard_map(0, NULL, &n_shards); - #if PG_MAJORVERSION_NUM < 16 - hash = murmurhash32(tag->rnode.relNode); - hash = hash_combine(hash, murmurhash32(tag->blockNum / stripe_size)); + hash = murmurhash32(rinfo.relNode); + hash = hash_combine(hash, murmurhash32(>blocknum / stripe_size)); #else - hash = murmurhash32(tag->relNumber); - hash = hash_combine(hash, murmurhash32(tag->blockNum / stripe_size)); + hash = murmurhash32(rinfo.relNumber); + hash = hash_combine(hash, murmurhash32(tag->blocknum / stripe_size)); #endif return hash % n_shards; } static inline void -CLEANUP_AND_DISCONNECT(PageServer *shard) +cleanup_and_disconnect(PageServer *ps) { - if (shard->wes_read) + if (ps->wes_read) { - FreeWaitEventSet(shard->wes_read); - shard->wes_read = NULL; + FreeWaitEventSet(ps->wes_read); + ps->wes_read = NULL; } - if (shard->conn) + if (ps->conn) { MyNeonCounters->pageserver_disconnects_total++; - PQfinish(shard->conn); - shard->conn = NULL; + PQfinish(ps->conn); + ps->conn = NULL; } - shard->state = PS_Disconnected; + ps->state = PS_Disconnected; +} + +/* + * Like pchmop but uses malloc instead palloc for thread safety + */ + +char* +static chomp(char const* in) +{ + size_t n; + + n = strlen(in); + while (n > 0 && in[n - 1] == '\n') + n--; + return strdup(in, n); } /* @@ -360,10 +383,11 @@ CLEANUP_AND_DISCONNECT(PageServer *shard) * is canceled. */ static bool -pageserver_connect(shardno_t shard_no, int elevel) +pageserver_connect(int chan_no, int elevel) { - PageServer *shard = &page_servers[shard_no]; + PageServer *ps = &page_servers[chan_no]; char connstr[MAX_PAGESERVER_CONNSTRING_SIZE]; + char pagestream_query[MAX_PS_QUERY_LENGTH]; /* * Get the connection string for this shard. If the shard map has been @@ -372,9 +396,9 @@ pageserver_connect(shardno_t shard_no, int elevel) * Note that connstr is used both during connection start, and when we * log the successful connection. */ - load_shard_map(shard_no, connstr, NULL); + load_shard_map(chan_no, connstr, NULL); - switch (shard->state) + switch (ps->state) { case PS_Disconnected: { @@ -385,13 +409,13 @@ pageserver_connect(shardno_t shard_no, int elevel) int64 us_since_last_attempt; /* Make sure we start with a clean slate */ - CLEANUP_AND_DISCONNECT(shard); + cleanup_and_disconnect(shard); - neon_shard_log(shard_no, DEBUG5, "Connection state: Disconnected"); + neon_shard_log(chan_no, DEBUG5, "Connection state: Disconnected"); now = GetCurrentTimestamp(); - us_since_last_attempt = (int64) (now - shard->last_reconnect_time); - shard->last_reconnect_time = now; + us_since_last_attempt = (int64) (now - ps->last_reconnect_time); + ps->last_reconnect_time = now; /* * Make sure we don't do exponential backoff with a constant multiplier @@ -399,20 +423,20 @@ pageserver_connect(shardno_t shard_no, int elevel) * * cf. https://github.com/neondatabase/neon/issues/7897 */ - if (shard->delay_us == 0) - shard->delay_us = MIN_RECONNECT_INTERVAL_USEC; + if (ps->delay_us == 0) + ps->delay_us = MIN_RECONNECT_INTERVAL_USEC; /* * If we did other tasks between reconnect attempts, then we won't * need to wait as long as a full delay. */ - if (us_since_last_attempt < shard->delay_us) + if (us_since_last_attempt < ps->delay_us) { - pg_usleep(shard->delay_us - us_since_last_attempt); + pg_usleep(ps->delay_us - us_since_last_attempt); } /* update the delay metric */ - shard->delay_us = Min(shard->delay_us * 2, MAX_RECONNECT_INTERVAL_USEC); + ps->delay_us = Min(ps->delay_us * 2, MAX_RECONNECT_INTERVAL_USEC); /* * Connect using the connection string we got from the @@ -425,7 +449,7 @@ pageserver_connect(shardno_t shard_no, int elevel) * we don't currently use that capability anywhere. */ keywords[0] = "dbname"; - values[0] = connstr; + values[0] = shard_map.connstrings[chan_no / parallel_connections]; n_pgsql_params = 1; if (neon_auth_token) @@ -438,28 +462,27 @@ pageserver_connect(shardno_t shard_no, int elevel) keywords[n_pgsql_params] = NULL; values[n_pgsql_params] = NULL; - shard->conn = PQconnectStartParams(keywords, values, 1); - if (PQstatus(shard->conn) == CONNECTION_BAD) + ps->conn = PQconnectStartParams(keywords, values, 1); + if (PQstatus(ps->conn) == CONNECTION_BAD) { - char *msg = pchomp(PQerrorMessage(shard->conn)); - CLEANUP_AND_DISCONNECT(shard); + char *msg = chomp(PQerrorMessage(ps->conn)); + cleanup_and_disconnect(shard); ereport(elevel, (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), - errmsg(NEON_TAG "[shard %d] could not establish connection to pageserver", shard_no), + errmsg(NEON_TAG "[shard %d] could not establish connection to pageserver", chan_no), errdetail_internal("%s", msg))); - pfree(msg); + free(msg); return false; } - shard->state = PS_Connecting_Startup; + ps->state = PS_Connecting_Startup; } /* FALLTHROUGH */ case PS_Connecting_Startup: { - char *pagestream_query; int ps_send_query_ret; bool connected = false; int poll_result = PGRES_POLLING_WRITING; - neon_shard_log(shard_no, DEBUG5, "Connection state: Connecting_Startup"); + neon_shard_log(chan_no, DEBUG5, "Connection state: Connecting_Startup"); do { @@ -468,24 +491,24 @@ pageserver_connect(shardno_t shard_no, int elevel) default: /* unknown/unused states are handled as a failed connection */ case PGRES_POLLING_FAILED: { - char *pqerr = PQerrorMessage(shard->conn); + char *pqerr = PQerrorMessage(ps->conn); char *msg = NULL; - neon_shard_log(shard_no, DEBUG5, "POLLING_FAILED"); + neon_shard_log(chan_no, DEBUG5, "POLLING_FAILED"); if (pqerr) - msg = pchomp(pqerr); + msg = chomp(pqerr); - CLEANUP_AND_DISCONNECT(shard); + cleanup_and_disconnect(shard); if (msg) { - neon_shard_log(shard_no, elevel, + neon_shard_log(chan_no, elevel, "could not connect to pageserver: %s", msg); - pfree(msg); + free(msg); } else - neon_shard_log(shard_no, elevel, + neon_shard_log(chan_no, elevel, "could not connect to pageserver"); return false; @@ -496,7 +519,7 @@ pageserver_connect(shardno_t shard_no, int elevel) { int rc = WaitLatchOrSocket(MyLatch, WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | WL_SOCKET_READABLE, - PQsocket(shard->conn), + PQsocket(ps->conn), 0, WAIT_EVENT_NEON_PS_STARTING); elog(DEBUG5, "PGRES_POLLING_READING=>%d", rc); @@ -518,7 +541,7 @@ pageserver_connect(shardno_t shard_no, int elevel) { int rc = WaitLatchOrSocket(MyLatch, WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | WL_SOCKET_WRITEABLE, - PQsocket(shard->conn), + PQsocket(ps->conn), 0, WAIT_EVENT_NEON_PS_STARTING); elog(DEBUG5, "PGRES_POLLING_WRITING=>%d", rc); @@ -535,91 +558,93 @@ pageserver_connect(shardno_t shard_no, int elevel) break; case PGRES_POLLING_OK: - neon_shard_log(shard_no, DEBUG5, "POLLING_OK"); + neon_shard_log(chan_no, DEBUG5, "POLLING_OK"); connected = true; break; } - poll_result = PQconnectPoll(shard->conn); + poll_result = PQconnectPoll(ps->conn); elog(DEBUG5, "PQconnectPoll=>%d", poll_result); } while (!connected); /* No more polling needed; connection succeeded */ - shard->last_connect_time = GetCurrentTimestamp(); + ps->last_connect_time = GetCurrentTimestamp(); + /* Allocate wait event set in critical section */ + pthread_lock(&mutex); #if PG_MAJORVERSION_NUM >= 17 - shard->wes_read = CreateWaitEventSet(NULL, 3); + ps->wes_read = CreateWaitEventSet(NULL, 3); #else - shard->wes_read = CreateWaitEventSet(TopMemoryContext, 3); + ps->wes_read = CreateWaitEventSet(TopMemoryContext, 3); #endif - AddWaitEventToSet(shard->wes_read, WL_LATCH_SET, PGINVALID_SOCKET, + AddWaitEventToSet(ps->wes_read, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL); - AddWaitEventToSet(shard->wes_read, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, + AddWaitEventToSet(ps->wes_read, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, NULL, NULL); - AddWaitEventToSet(shard->wes_read, WL_SOCKET_READABLE, PQsocket(shard->conn), NULL, NULL); + AddWaitEventToSet(ps->wes_read, WL_SOCKET_READABLE, PQsocket(ps->conn), NULL, NULL); + pthread_unlock(&mutex); switch (neon_protocol_version) { case 3: - pagestream_query = psprintf("pagestream_v3 %s %s", neon_tenant, neon_timeline); + snprintf(pagesstream_query, sizeof pagestream_query, "pagestream_v3 %s %s", neon_tenant, neon_timeline); break; case 2: - pagestream_query = psprintf("pagestream_v2 %s %s", neon_tenant, neon_timeline); + snprintf(pagesstream_query, sizeof pagestream_query, "pagestream_v2 %s %s", neon_tenant, neon_timeline); break; default: elog(ERROR, "unexpected neon_protocol_version %d", neon_protocol_version); } - if (PQstatus(shard->conn) == CONNECTION_BAD) + if (PQstatus(ps->conn) == CONNECTION_BAD) { - char *msg = pchomp(PQerrorMessage(shard->conn)); + char *msg = chomp(PQerrorMessage(ps->conn)); - CLEANUP_AND_DISCONNECT(shard); + cleanup_and_disconnect(shard); ereport(elevel, (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), - errmsg(NEON_TAG "[shard %d] could not establish connection to pageserver", shard_no), + errmsg(NEON_TAG "[shard %d] could not establish connection to pageserver", chan_no), errdetail_internal("%s", msg))); - pfree(msg); + free(msg); return false; } - ps_send_query_ret = PQsendQuery(shard->conn, pagestream_query); - pfree(pagestream_query); + ps_send_query_ret = PQsendQuery(ps->conn, pagestream_query); if (ps_send_query_ret != 1) { - CLEANUP_AND_DISCONNECT(shard); + cleanup_and_disconnect(shard); - neon_shard_log(shard_no, elevel, "could not send pagestream command to pageserver"); + neon_shard_log(chan_no, elevel, "could not send pagestream command to pageserver"); return false; } - shard->state = PS_Connecting_PageStream; + ps->state = PS_Connecting_PageStream; } /* FALLTHROUGH */ case PS_Connecting_PageStream: { - neon_shard_log(shard_no, DEBUG5, "Connection state: Connecting_PageStream"); + neon_shard_log(chan_no, DEBUG5, "Connection state: Connecting_PageStream"); - if (PQstatus(shard->conn) == CONNECTION_BAD) + if (PQstatus(ps->conn) == CONNECTION_BAD) { - char *msg = pchomp(PQerrorMessage(shard->conn)); - CLEANUP_AND_DISCONNECT(shard); + char *msg = chomp(PQerrorMessage(ps->conn)); + cleanup_and_disconnect(shard); ereport(elevel, (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), - errmsg(NEON_TAG "[shard %d] could not establish connection to pageserver", shard_no), + errmsg(NEON_TAG "[shard %d] could not establish connection to pageserver", chan_no), errdetail_internal("%s", msg))); - pfree(msg); + free(msg); return false; } - while (PQisBusy(shard->conn)) + while (PQisBusy(ps->conn)) { WaitEvent event; /* Sleep until there's something to do */ - (void) WaitEventSetWait(shard->wes_read, -1L, &event, 1, + (void) WaitEventSetWait(ps->wes_read, -1L, &event, 1, WAIT_EVENT_NEON_PS_CONFIGURING); ResetLatch(MyLatch); @@ -628,22 +653,22 @@ pageserver_connect(shardno_t shard_no, int elevel) /* Data available in socket? */ if (event.events & WL_SOCKET_READABLE) { - if (!PQconsumeInput(shard->conn)) + if (!PQconsumeInput(ps->conn)) { - char *msg = pchomp(PQerrorMessage(shard->conn)); + char *msg = chomp(PQerrorMessage(ps->conn)); - CLEANUP_AND_DISCONNECT(shard); - neon_shard_log(shard_no, elevel, "could not complete handshake with pageserver: %s", + cleanup_and_disconnect(shard); + neon_shard_log(chan_no, elevel, "could not complete handshake with pageserver: %s", msg); - pfree(msg); + free(msg); return false; } } } - shard->state = PS_Connected; - shard->nrequests_sent = 0; - shard->nresponses_received = 0; + ps->state = PS_Connected; + ps->nrequests_sent = 0; + ps->nresponses_received = 0; } /* FALLTHROUGH */ case PS_Connected: @@ -651,13 +676,13 @@ pageserver_connect(shardno_t shard_no, int elevel) * We successfully connected. Future connections to this PageServer * will do fast retries again, with exponential backoff. */ - shard->delay_us = MIN_RECONNECT_INTERVAL_USEC; + ps->delay_us = MIN_RECONNECT_INTERVAL_USEC; - neon_shard_log(shard_no, DEBUG5, "Connection state: Connected"); - neon_shard_log(shard_no, LOG, "libpagestore: connected to '%s' with protocol version %d", connstr, neon_protocol_version); + neon_shard_log(chan_no, DEBUG5, "Connection state: Connected"); + neon_shard_log(chan_no, LOG, "libpagestore: connected to '%s' with protocol version %d", connstr, neon_protocol_version); return true; default: - neon_shard_log(shard_no, ERROR, "libpagestore: invalid connection state %d", shard->state); + neon_shard_log(chan_no, ERROR, "libpagestore: invalid connection state %d", ps->state); } /* This shouldn't be hit */ Assert(false); @@ -667,11 +692,11 @@ pageserver_connect(shardno_t shard_no, int elevel) * A wrapper around PQgetCopyData that checks for interrupts while sleeping. */ static int -call_PQgetCopyData(shardno_t shard_no, char **buffer) +call_PQgetCopyData(int chan_no, char **buffer) { int ret; - PageServer *shard = &page_servers[shard_no]; - PGconn *pageserver_conn = shard->conn; + PageServer *ps = &page_servers[chan_no]; + PGconn *pageserver_conn = ps->conn; instr_time now, start_ts, since_start, @@ -705,7 +730,7 @@ retry: timeout = Max(0, LOG_INTERVAL_MS - INSTR_TIME_GET_MILLISEC(since_last_log)); /* Sleep until there's something to do */ - (void) WaitEventSetWait(shard->wes_read, timeout, &event, 1, + (void) WaitEventSetWait(ps->wes_read, timeout, &event, 1, WAIT_EVENT_NEON_PS_READ); ResetLatch(MyLatch); @@ -716,10 +741,10 @@ retry: { if (!PQconsumeInput(pageserver_conn)) { - char *msg = pchomp(PQerrorMessage(pageserver_conn)); + char *msg = chomp(PQerrorMessage(pageserver_conn)); - neon_shard_log(shard_no, LOG, "could not get response from pageserver: %s", msg); - pfree(msg); + neon_shard_log(chan_no, LOG, "could not get response from pageserver: %s", msg); + free(msg); return -1; } } @@ -760,9 +785,9 @@ retry: } } #endif - neon_shard_log(shard_no, LOG, "no response received from pageserver for %0.3f s, still waiting (sent " UINT64_FORMAT " requests, received " UINT64_FORMAT " responses) (socket sndbuf=%d recvbuf=%d)", + neon_shard_log(chan_no, LOG, "no response received from pageserver for %0.3f s, still waiting (sent " UINT64_FORMAT " requests, received " UINT64_FORMAT " responses) (socket sndbuf=%d recvbuf=%d)", INSTR_TIME_GET_DOUBLE(since_start), - shard->nrequests_sent, shard->nresponses_received, sndbuf, recvbuf); + ps->nrequests_sent, ps->nresponses_received, sndbuf, recvbuf); last_log_ts = now; logged = true; } @@ -779,39 +804,20 @@ retry: INSTR_TIME_SET_CURRENT(now); since_start = now; INSTR_TIME_SUBTRACT(since_start, start_ts); - neon_shard_log(shard_no, LOG, "received response from pageserver after %0.3f s", + neon_shard_log(chan_no, LOG, "received response from pageserver after %0.3f s", INSTR_TIME_GET_DOUBLE(since_start)); } return ret; } -/* - * Reset prefetch and drop connection to the shard. - * It also drops connection to all other shards involved in prefetch, through - * prefetch_on_ps_disconnect(). - */ -static void -pageserver_disconnect(shardno_t shard_no) -{ - /* - * If the connection to any pageserver is lost, we throw away the - * whole prefetch queue, even for other pageservers. It should not - * cause big problems, because connection loss is supposed to be a - * rare event. - */ - prefetch_on_ps_disconnect(); - - pageserver_disconnect_shard(shard_no); -} - /* * Disconnect from specified shard */ static void -pageserver_disconnect_shard(shardno_t shard_no) +pageserver_disconnect(int chan_no) { - PageServer *shard = &page_servers[shard_no]; + PageServer *ps = &page_servers[chan_no]; /* * If anything goes wrong while we were sending a request, it's not clear * what state the connection is in. For example, if we sent the request @@ -823,30 +829,27 @@ pageserver_disconnect_shard(shardno_t shard_no) * of the wait event sets or the psql connection, or failed when we tried * to attach wait events to the WaitEventSets. */ - CLEANUP_AND_DISCONNECT(shard); + cleanup_and_disconnect(ps); - shard->state = PS_Disconnected; + ps->state = PS_Disconnected; } static bool -pageserver_send(shardno_t shard_no, NeonRequest *request) +pageserver_send(int chan_no, StringInfo* msg) { - StringInfoData req_buff; - PageServer *shard = &page_servers[shard_no]; + PageServer *ps = &page_servers[chan_no]; PGconn *pageserver_conn; MyNeonCounters->pageserver_requests_sent_total++; /* If the connection was lost for some reason, reconnect */ - if (shard->state == PS_Connected && PQstatus(shard->conn) == CONNECTION_BAD) + if (ps->state == PS_Connected && PQstatus(ps->conn) == CONNECTION_BAD) { - neon_shard_log(shard_no, LOG, "pageserver_send disconnect bad connection"); - pageserver_disconnect(shard_no); + neon_shard_log(chan_no, LOG, "pageserver_send disconnect bad connection"); + pageserver_disconnect(chan_no); pageserver_conn = NULL; } - req_buff = nm_pack_request(request); - /* * If pageserver is stopped, the connections from compute node are broken. * The compute node doesn't notice that immediately, but it will cause the @@ -856,18 +859,24 @@ pageserver_send(shardno_t shard_no, NeonRequest *request) * https://github.com/neondatabase/neon/issues/1138 So try to reestablish * connection in case of failure. */ - if (shard->state != PS_Connected) + if (ps->state != PS_Connected) { - while (!pageserver_connect(shard_no, shard->n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR)) + if (ps->state == PS_Expired) { - shard->n_reconnect_attempts += 1; + neon_shard_log(chan_no, LOG, "pageserver_send disconnect expired connection"); + pageserver_disconnect(chan_no); + pageserver_conn = NULL; } - shard->n_reconnect_attempts = 0; + while (!pageserver_connect(chan_no, LOG)) + { + ps->n_reconnect_attempts += 1; + } + ps->n_reconnect_attempts = 0; } else { - Assert(shard->conn != NULL); + Assert(ps->conn != NULL); } - pageserver_conn = shard->conn; + pageserver_conn = ps->conn; /* * Send request. @@ -881,52 +890,50 @@ pageserver_send(shardno_t shard_no, NeonRequest *request) * PGRES_POLLING_WRITING state. It's kinda dirty to disconnect at this * point, but on the grand scheme of things it's only a small issue. */ - shard->nrequests_sent++; - if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0) + ps->nrequests_sent++; + if (PQputCopyData(pageserver_conn, msg->data, msg->len) <= 0) { - char *msg = pchomp(PQerrorMessage(pageserver_conn)); + char *errmsg = chomp(PQerrorMessage(pageserver_conn)); - pageserver_disconnect(shard_no); - neon_shard_log(shard_no, LOG, "pageserver_send disconnected: failed to send page request (try to reconnect): %s", msg); - pfree(msg); - pfree(req_buff.data); + pageserver_disconnect(chan_no); + neon_shard_log(chan_no, LOG, "pageserver_send disconnected: failed to send page request (try to reconnect): %s", errmsg); + free(errmsg); return false; } - - pfree(req_buff.data); - - if (message_level_is_interesting(PageStoreTrace)) + if (PQflush(pageserver_conn)) { - char *msg = nm_to_string((NeonMessage *) request); + char *errmsg = chomp(PQerrorMessage(pageserver_conn)); - neon_shard_log(shard_no, PageStoreTrace, "sent request: %s", msg); - pfree(msg); + pageserver_disconnect(chan_no); + neon_shard_log(chan_no, LOG, "pageserver_flush disconnect because failed to flush page requests: %s", errmsg); + free(errmsg); + return false; } - return true; } static NeonResponse * -pageserver_receive(shardno_t shard_no) +pageserver_receive(int chan_no, StringInfo* buf) { StringInfoData resp_buff; NeonResponse *resp; - PageServer *shard = &page_servers[shard_no]; - PGconn *pageserver_conn = shard->conn; + PageServer *shard = &page_servers[chan_no]; + PGconn *pageserver_conn = ps->conn; /* read response */ int rc; - if (shard->state != PS_Connected) + /* TODO: fix race condition between sender and receivcer */ + if (ps->state != PS_Connected) { - neon_shard_log(shard_no, LOG, + neon_shard_log(chan_no, LOG, "pageserver_receive: returning NULL for non-connected pageserver connection: 0x%02x", - shard->state); + ps->state); return NULL; } Assert(pageserver_conn); - rc = call_PQgetCopyData(shard_no, &resp_buff.data); + rc = call_PQgetCopyData(chan_no, &resp_buff.data); if (rc >= 0) { /* call_PQgetCopyData handles rc == 0 */ @@ -941,146 +948,35 @@ pageserver_receive(shardno_t shard_no) } PG_CATCH(); { - neon_shard_log(shard_no, LOG, "pageserver_receive: disconnect due to failure while parsing response"); - pageserver_disconnect(shard_no); + neon_shard_log(chan_no, LOG, "pageserver_receive: disconnect due to failure while parsing response"); + pageserver_disconnect(chan_no); PG_RE_THROW(); } PG_END_TRY(); - - if (message_level_is_interesting(PageStoreTrace)) - { - char *msg = nm_to_string((NeonMessage *) resp); - - neon_shard_log(shard_no, PageStoreTrace, "got response: %s", msg); - pfree(msg); - } } else if (rc == -1) { - neon_shard_log(shard_no, LOG, "pageserver_receive disconnect: psql end of copy data: %s", pchomp(PQerrorMessage(pageserver_conn))); - pageserver_disconnect(shard_no); + neon_shard_log(chan_no, LOG, "pageserver_receive disconnect: psql end of copy data: %s", chomp(PQerrorMessage(pageserver_conn))); + pageserver_disconnect(chan_no); resp = NULL; } else if (rc == -2) { - char *msg = pchomp(PQerrorMessage(pageserver_conn)); + char *msg = chomp(PQerrorMessage(pageserver_conn)); - pageserver_disconnect(shard_no); - neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect: could not read COPY data: %s", msg); + pageserver_disconnect(chan_no); + neon_shard_log(chan_no, ERROR, "pageserver_receive disconnect: could not read COPY data: %s", msg); } else { - pageserver_disconnect(shard_no); - neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect: unexpected PQgetCopyData return value: %d", rc); + pageserver_disconnect(chan_no); + neon_shard_log(chan_no, ERROR, "pageserver_receive disconnect: unexpected PQgetCopyData return value: %d", rc); } - shard->nresponses_received++; + ps->nresponses_received++; return (NeonResponse *) resp; } -static NeonResponse * -pageserver_try_receive(shardno_t shard_no) -{ - StringInfoData resp_buff; - NeonResponse *resp; - PageServer *shard = &page_servers[shard_no]; - PGconn *pageserver_conn = shard->conn; - /* read response */ - int rc; - - if (shard->state != PS_Connected) - return NULL; - - Assert(pageserver_conn); - - rc = PQgetCopyData(shard->conn, &resp_buff.data, 1 /* async = true */); - - if (rc == 0) - return NULL; - else if (rc > 0) - { - PG_TRY(); - { - resp_buff.len = rc; - resp_buff.cursor = 0; - resp = nm_unpack_response(&resp_buff); - PQfreemem(resp_buff.data); - } - PG_CATCH(); - { - neon_shard_log(shard_no, LOG, "pageserver_receive: disconnect due to failure while parsing response"); - pageserver_disconnect(shard_no); - PG_RE_THROW(); - } - PG_END_TRY(); - - if (message_level_is_interesting(PageStoreTrace)) - { - char *msg = nm_to_string((NeonMessage *) resp); - - neon_shard_log(shard_no, PageStoreTrace, "got response: %s", msg); - pfree(msg); - } - } - else if (rc == -1) - { - neon_shard_log(shard_no, LOG, "pageserver_receive disconnect: psql end of copy data: %s", pchomp(PQerrorMessage(pageserver_conn))); - pageserver_disconnect(shard_no); - resp = NULL; - } - else if (rc == -2) - { - char *msg = pchomp(PQerrorMessage(pageserver_conn)); - - pageserver_disconnect(shard_no); - neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect: could not read COPY data: %s", msg); - } - else - { - pageserver_disconnect(shard_no); - neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect: unexpected PQgetCopyData return value: %d", rc); - } - - shard->nresponses_received++; - return (NeonResponse *) resp; -} - - -static bool -pageserver_flush(shardno_t shard_no) -{ - PGconn *pageserver_conn = page_servers[shard_no].conn; - - if (page_servers[shard_no].state != PS_Connected) - { - neon_shard_log(shard_no, WARNING, "Tried to flush while disconnected"); - } - else - { - MyNeonCounters->pageserver_send_flushes_total++; - if (PQflush(pageserver_conn)) - { - char *msg = pchomp(PQerrorMessage(pageserver_conn)); - - pageserver_disconnect(shard_no); - neon_shard_log(shard_no, LOG, "pageserver_flush disconnect because failed to flush page requests: %s", msg); - pfree(msg); - return false; - } - } - - return true; -} - -page_server_api api = -{ - .send = pageserver_send, - .flush = pageserver_flush, - .receive = pageserver_receive, - .try_receive = pageserver_try_receive, - .disconnect = pageserver_disconnect_shard -}; - static bool check_neon_id(char **newval, void **extra, GucSource source) { @@ -1089,10 +985,30 @@ check_neon_id(char **newval, void **extra, GucSource source) return **newval == '\0' || HexDecodeString(id, *newval, 16); } +static Size +RequestBufferSize(void) +{ + return (readahead_buffer_size + PG_IOV_MAX) * MaxBackends + (parallel_connections - 1) / parallel_connections; +} + +static Size +NumberOfChannels(void) +{ + return MAX_SHARDS * parallel_connections; +} + +static Size +CommunicatorShmemSize(void) +{ + return RequestBufferSize() * NumberOfChannels() * sizeof(CommunicatorRequest) + + NumberOfChannels() * sizeof(CommunicatorChannel) + + sizeof(NeonCommunicatorResponse) * MaxBackends; +} + static Size PagestoreShmemSize(void) { - return add_size(sizeof(PagestoreShmemState), NeonPerfCountersShmemSize()); + return CommunicatorShmemSize() + NeonPerfCountersShmemSize(); } static bool @@ -1100,16 +1016,29 @@ PagestoreShmemInit(void) { bool found; + #if PG_VERSION_NUM < 170000 + MyProcNumber = MyProc->procno; + #endif + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); - pagestore_shared = ShmemInitStruct("libpagestore shared state", - sizeof(PagestoreShmemState), - &found); + channels = ShmemInitStruct("communicator shared state", + CommunicatorShmemSize(), + &found); if (!found) { - pg_atomic_init_u64(&pagestore_shared->begin_update_counter, 0); - pg_atomic_init_u64(&pagestore_shared->end_update_counter, 0); - memset(&pagestore_shared->shard_map, 0, sizeof(ShardMap)); - AssignPageserverConnstring(page_server_connstring, NULL); + size_t n_channels = NumberOfChannels(); + NeonCommunicatorRequest* requests = (NeonCommunicatorRequest*)(channels + n_channels); + + for (size_t i = 0; i < channels; i++) + { + NeonCommunicatorChannel* chan = channels[i]' + pg_atomic_init_u64(&chan->write_pos, 0); + pg_atomic_init_u64(&chan->read_pos, 0); + InitLatch(&chan->latch); + chan->requests = requests; + requests += RequestBufferSize(); + } + responses = (NeonCommunicatorResponse*)requests; } NeonPerfCountersShmemInit(); @@ -1151,6 +1080,93 @@ pagestore_prepare_shmem(void) shmem_startup_hook = pagestore_shmem_startup_hook; } +void +communicator_send_request(int shard, NeonCommunicationRequest* req) +{ + /* bind backend to the particular channel */ + NeonCommunicationChannel* chan = channels[shard * parallel_connections + (MyBackendId % parallel_connections)]; + size_t ring_size = RequestBufferSize(); + uint64 write_pos = pg_atomic_add_fetch_u64(&chan->write_pos, 1); /* reserve write position */ + uint64 read_pos; + + Assert(req->hdr.reqid == 0); /* ring overflow should not happen */ + req->hdr.procno = MyProcNumber; + + /* copy request */ + chan->requests[(size_t)(write_pos % ring_size)] = *req; + + /* advance read-up-tp position */ + do { + read_pos = write_pos; + } while (!pg_atomic_compare_exchange_u64(&chan->read_pos, &read_pos, write_pos+1) && read_pos <= write_pos); + + responses[MyProcNumber] = req->hdr.tag; + SetLatch(&chan->latch); +} + +int64 +communicator_receive_response(void) +{ + while (responses[MyProcNumber].tag <= T_NeonTestRequest) /* response not yet received */ + { + (void) WaitLatch(MyLatch, + WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, + -1L, + WAIT_EVENT_NEON_PS_READ); + } + if (responses[MyProcNumber].tag == T_NeonErrorResponse) + { + elog(ERROR, "Request failed"); /* detailed error reposnse is printed by communicator */ + } + return responses[MyProcNumber].value; +} + +int64 +communicator_request(int shard, NeonCommunicationRequest* req) +{ + communicator_send_request(shard, req); + return communicator_receive_response(); +} + + +void +CommunicatorMain(Datum main_arg) +{ + am_communicator = true; + /* Establish signal handlers. */ + pqsignal(SIGUSR1, procsignal_sigusr1_handler); + pqsignal(SIGHUP, SignalHandlerForConfigReload); + pqsignal(SIGTERM, SignalHandlerForShutdownRequest); + BackgroundWorkerUnblockSignals(); + + while (!ShutdownRequestPending) + { + (void) WaitLatch(MyLatch, + WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, + -1L, + PG_WAIT_EXTENSION); + CHECK_FOR_INTERRUPTS(); + } +} + +static void +register_communicator_worker(void) +{ + BackgroundWorker bgw; + memset(&bgw, 0, sizeof(bgw)); + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS; + bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; + snprintf(bgw.bgw_library_name, BGW_MAXLEN, "neon"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "CommunicatorMain"); + snprintf(bgw.bgw_name, BGW_MAXLEN, "Page server communicator"); + snprintf(bgw.bgw_type, BGW_MAXLEN, "Page server communicator"); + bgw.bgw_restart_time = 5; + bgw.bgw_notify_pid = 0; + bgw.bgw_main_arg = (Datum) 0; + + RegisterBackgroundWorker(&bgw); +} + /* * Module initialization function */ @@ -1203,14 +1219,6 @@ pg_init_libpagestore(void) PGC_SIGHUP, GUC_UNIT_MB, NULL, NULL, NULL); - DefineCustomIntVariable("neon.flush_output_after", - "Flush the output buffer after every N unflushed requests", - NULL, - &flush_every_n_requests, - 8, -1, INT_MAX, - PGC_USERSET, - 0, /* no flags required */ - NULL, NULL, NULL); DefineCustomIntVariable("neon.max_reconnect_attempts", "Maximal attempts to reconnect to pages server (with 1 second timeout)", NULL, @@ -1229,15 +1237,23 @@ pg_init_libpagestore(void) "values for these settings.", &readahead_buffer_size, 128, 16, 1024, - PGC_USERSET, + PGC_POSTMASTER, 0, /* no flags required */ - NULL, (GucIntAssignHook) &readahead_buffer_resize, NULL); + NULL, NULL, NULL); + DefineCustomIntVariable("neon.parallel_connections", + "number of connections to each shard", + NULL, + ¶llel_connections, + 10, 1, 16, + PGC_POSTMASTER, + 0, /* no flags required */ + NULL, NULL, NULL); DefineCustomIntVariable("neon.protocol_version", "Version of compute<->page server protocol", NULL, &neon_protocol_version, - 2, /* use protocol version 2 */ - 2, /* min */ + 3, /* use protocol version 2 */ + 3, /* min */ 3, /* max */ PGC_SU_BACKEND, 0, /* no flags required */ @@ -1245,6 +1261,8 @@ pg_init_libpagestore(void) relsize_hash_init(); + register_communicator_worker(); + if (page_server != NULL) neon_log(ERROR, "libpagestore already loaded"); @@ -1271,3 +1289,101 @@ pg_init_libpagestore(void) lfc_init(); } + +static void +allocStringInfo(StringInfo* s, size_t size) +{ + s->data = (char *)malloc(size); + s->maxlen = size; + resetStringInfo(s); +} + +static void* +communicator_write_loop(void* arg) +{ + uint64 read_start_pos = 0; + size_t chan_no = (size_t)arg; + NeonCommunicatorChannel* chan = channels[chan_no] + size_t n_channels = NumberOfChannels(); + StrintgInfoData s; + + allocStringInfo(&s, NAX_REQUEST_SIZE); + + while (true) + { + NeonCommunicatorrequest* req; + uint64 read_end_pos; + + /* Number of shards is decreased */ + if (chan_no >= shard_map.num_shards * parallel_connection) + return NULL; + + read_end_pos = pg_atomic_read_u64(&chan->read_pos); + Assert(read_start_pos <= read_end_pos); + while (read_start_pos == read_end_pos) + { + int events = WaitLatch(&chan->latch, WL_LATCH_SET|WL_POSTMASTER_DEATH, WAIT_EVENT_NEON_PS_SEND); + if (events & WL_POSTMASTER_DEATH) + break; + } + req = chan->requests[read_start_pos % n_channels++]; + nm_pack_request(&s, &req->hdr); + pageserver_send(id, &s); + req->hdr.reqid = 0; /* mark requests as processed */ + } +} + +static void* +communicator_read_loop(void* arg) +{ + NeonResponse* resp; + int64 value = 0; + size_t chan_no = (size_t)arg; + + while (true) + { + /* Number of shards is decreased */ + if (chan_no >= shard_map.num_shards * parallel_connection) + return NULL; + + resp = pageserver_receive(chan_no); + if (resp == NULL) + { + pg_usleep(RECEVIER_RETRY_DELAY); + continue; + } + switch (resp->tag) + { + case T_NeonExistsResponse: + value = ((NeonExistsResponse*)resp)->exists; + break; + case T_NeonNblocksResponse: + value = ((NeonNblocksResponse*)resp)->n_blocks; + break; + case T_NeonDbSizeResponse: + value = ((NeonDbSizeResponse*)resp)->db_size; + break; + case T_NeonGetPageResponse: + if (resp->recepient.bufid == InvalidBuffer) + { + NeonGetPageResponse* page_resp = (NeonGetPageResponse*)resp; + /* result of prefetch */ + lfc_prefetch(page_resp->req.rinfo, page_resp->req.forknum, page_resp->req.blkno, page_resp->page, resp->not_modified_since); + continue; /* no need to notify backend */ + } else { + memcpy(BufferGetBlock(resp->recepient.bufid), page_resp->page, BLCKSZ); + } + break; + case T_NeonErrorResponse: + log_error_message((NeonErrorResponse *) resp); + break; + default: + break; + } + responses[resp->recepient.procno].value = value; + responses[resp->recepient.procno].tag = resp->tag; + SetLatch(&ProcGlobal->allProcs[resp->procno].procLatch); + free(resp); + } +} + diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index 7b748d7252..b6f4c131f5 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -54,7 +54,13 @@ typedef uint64 NeonRequestId; typedef struct { NeonMessageTag tag; - NeonRequestId reqid; + union { + struct { + int procno; /* process number */ + Buffer bufid; /* InvalidBuffer for prefetch */ + } recepient; + NeonRequestId reqid; /* two fields above temporary replace reqid, just to preserve protocol */ + } u; XLogRecPtr lsn; XLogRecPtr not_modified_since; } NeonMessage; @@ -140,6 +146,30 @@ typedef struct int segno; } NeonGetSlruSegmentRequest; +typedef union { + NeonRequest hdr; + NeonNblocksRequest exists; + NeonNblocksRequest nblocks; + NeonDbSizeRequest dbsize; + NeonGetPageRequest page; + NeonGetSlruSegmentRequest slru; +} NeonCommunicatorRequest; + + +typedef struct +{ + NeonMessageTag tag; + int64 value; +} NeonCommunicatorResponse; + +typedef struct +{ + pg_atomic_uint64 write_pos; + pg_atomic_uint64 read_pos; + Latch latch; + NeonCommunicatorRequest* requests; +} NeonCommunicatorChannel; + /* supertype of all the Neon*Response structs below */ typedef NeonMessage NeonResponse; @@ -161,6 +191,7 @@ typedef struct char page[FLEXIBLE_ARRAY_MEMBER]; } NeonGetPageResponse; + #define PS_GETPAGERESPONSE_SIZE (MAXALIGN(offsetof(NeonGetPageResponse, page) + BLCKSZ)) typedef struct @@ -184,50 +215,14 @@ typedef struct } NeonGetSlruSegmentResponse; -extern StringInfoData nm_pack_request(NeonRequest *msg); +extern void nm_pack_request(StringInfo s, NeonRequest *msg); extern NeonResponse *nm_unpack_response(StringInfo s); -extern char *nm_to_string(NeonMessage *msg); /* * API */ -typedef uint16 shardno_t; - -typedef struct -{ - /* - * Send this request to the PageServer associated with this shard. - */ - bool (*send) (shardno_t shard_no, NeonRequest * request); - /* - * Blocking read for the next response of this shard. - * - * When a CANCEL signal is handled, the connection state will be - * unmodified. - */ - NeonResponse *(*receive) (shardno_t shard_no); - /* - * Try get the next response from the TCP buffers, if any. - * Returns NULL when the data is not yet available. - */ - NeonResponse *(*try_receive) (shardno_t shard_no); - /* - * Make sure all requests are sent to PageServer. - */ - bool (*flush) (shardno_t shard_no); - /* - * Disconnect from this pageserver shard. - */ - void (*disconnect) (shardno_t shard_no); -} page_server_api; - -extern void prefetch_on_ps_disconnect(void); - -extern page_server_api *page_server; - extern char *page_server_connstring; -extern int flush_every_n_requests; extern int readahead_buffer_size; extern char *neon_timeline; extern char *neon_tenant; @@ -319,4 +314,9 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, return lfc_writev(rinfo, forkNum, blkno, &buffer, 1); } + +extern void communicator_send_request(int shard, NeonCommunicationRequest* req); +extern int64 communicator_receive_response(void); +extern int64 communicator_request(int shard, NeonCommunicationRequest* req); + #endif diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index f1087a8ccb..0795437999 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -101,8 +101,6 @@ const int SmgrTrace = DEBUG5; neon_shard_log(shard_no, elvl, "Broken connection state: " message, \ ##__VA_ARGS__) -page_server_api *page_server; - /* unlogged relation build states */ typedef enum { @@ -120,1135 +118,14 @@ static bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block static BlockNumber neon_nblocks(SMgrRelation reln, ForkNumber forknum); -static uint32 local_request_counter; -#define GENERATE_REQUEST_ID() (((NeonRequestId)MyProcPid << 32) | ++local_request_counter) - -/* - * Prefetch implementation: - * - * Prefetch is performed locally by each backend. - * - * There can be up to readahead_buffer_size active IO requests registered at - * any time. Requests using smgr_prefetch are sent to the pageserver, but we - * don't wait on the response. Requests using smgr_read are either read from - * the buffer, or (if that's not possible) we wait on the response to arrive - - * this also will allow us to receive other prefetched pages. - * Each request is immediately written to the output buffer of the pageserver - * connection, but may not be flushed if smgr_prefetch is used: pageserver - * flushes sent requests on manual flush, or every neon.flush_output_after - * unflushed requests; which is not necessarily always and all the time. - * - * Once we have received a response, this value will be stored in the response - * buffer, indexed in a hash table. This allows us to retain our buffered - * prefetch responses even when we have cache misses. - * - * Reading of prefetch responses is delayed until them are actually needed - * (smgr_read). In case of prefetch miss or any other SMGR request other than - * smgr_read, all prefetch responses in the pipeline will need to be read from - * the connection; the responses are stored for later use. - * - * NOTE: The current implementation of the prefetch system implements a ring - * buffer of up to readahead_buffer_size requests. If there are more _read and - * _prefetch requests between the initial _prefetch and the _read of a buffer, - * the prefetch request will have been dropped from this prefetch buffer, and - * your prefetch was wasted. - */ - -/* - * State machine: - * - * not in hash : in hash - * : - * UNUSED ------> REQUESTED --> RECEIVED - * ^ : | | - * | : v | - * | : TAG_UNUSED | - * | : | | - * +----------------+------------+ - * : - */ -typedef enum PrefetchStatus -{ - PRFS_UNUSED = 0, /* unused slot */ - PRFS_REQUESTED, /* request was written to the sendbuffer to - * PS, but not necessarily flushed. all fields - * except response valid */ - PRFS_RECEIVED, /* all fields valid */ - PRFS_TAG_REMAINS, /* only buftag and my_ring_index are still - * valid */ -} PrefetchStatus; - -/* must fit in uint8; bits 0x1 are used */ -typedef enum { - PRFSF_NONE = 0x0, - PRFSF_SEQ = 0x1, -} PrefetchRequestFlags; - -typedef struct PrefetchRequest -{ - BufferTag buftag; /* must be first entry in the struct */ - shardno_t shard_no; - uint8 status; /* see PrefetchStatus for valid values */ - uint8 flags; /* see PrefetchRequestFlags */ - neon_request_lsns request_lsns; - NeonRequestId reqid; - NeonResponse *response; /* may be null */ - uint64 my_ring_index; -} PrefetchRequest; - -/* prefetch buffer lookup hash table */ - -typedef struct PrfHashEntry -{ - PrefetchRequest *slot; - uint32 status; - uint32 hash; -} PrfHashEntry; - -#define SH_PREFIX prfh -#define SH_ELEMENT_TYPE PrfHashEntry -#define SH_KEY_TYPE PrefetchRequest * -#define SH_KEY slot -#define SH_STORE_HASH -#define SH_GET_HASH(tb, a) ((a)->hash) -#define SH_HASH_KEY(tb, key) hash_bytes( \ - ((const unsigned char *) &(key)->buftag), \ - sizeof(BufferTag) \ -) - -#define SH_EQUAL(tb, a, b) (BufferTagsEqual(&(a)->buftag, &(b)->buftag)) -#define SH_SCOPE static inline -#define SH_DEFINE -#define SH_DECLARE -#include "lib/simplehash.h" -#include "neon.h" - -/* - * PrefetchState maintains the state of (prefetch) getPage@LSN requests. - * It maintains a (ring) buffer of in-flight requests and responses. - * - * We maintain several indexes into the ring buffer: - * ring_unused >= ring_flush >= ring_receive >= ring_last >= 0 - * - * ring_unused points to the first unused slot of the buffer - * ring_receive is the next request that is to be received - * ring_last is the oldest received entry in the buffer - * - * Apart from being an entry in the ring buffer of prefetch requests, each - * PrefetchRequest that is not UNUSED is indexed in prf_hash by buftag. - */ -typedef struct PrefetchState -{ - MemoryContext bufctx; /* context for prf_buffer[].response - * allocations */ - MemoryContext errctx; /* context for prf_buffer[].response - * allocations */ - MemoryContext hashctx; /* context for prf_buffer */ - - /* buffer indexes */ - uint64 ring_unused; /* first unused slot */ - uint64 ring_flush; /* next request to flush */ - uint64 ring_receive; /* next slot that is to receive a response */ - uint64 ring_last; /* min slot with a response value */ - - /* metrics / statistics */ - int n_responses_buffered; /* count of PS responses not yet in - * buffers */ - int n_requests_inflight; /* count of PS requests considered in - * flight */ - int n_unused; /* count of buffers < unused, > last, that are - * also unused */ - - /* the buffers */ - prfh_hash *prf_hash; - int max_shard_no; - /* Mark shards involved in prefetch */ - uint8 shard_bitmap[(MAX_SHARDS + 7)/8]; - PrefetchRequest prf_buffer[]; /* prefetch buffers */ -} PrefetchState; - -static PrefetchState *MyPState; - -#define GetPrfSlotNoCheck(ring_index) ( \ - &MyPState->prf_buffer[((ring_index) % readahead_buffer_size)] \ -) - -#define GetPrfSlot(ring_index) ( \ - ( \ - AssertMacro((ring_index) < MyPState->ring_unused && \ - (ring_index) >= MyPState->ring_last), \ - GetPrfSlotNoCheck(ring_index) \ - ) \ -) - -#define ReceiveBufferNeedsCompaction() (\ - (MyPState->n_responses_buffered / 8) < ( \ - MyPState->ring_receive - \ - MyPState->ring_last - \ - MyPState->n_responses_buffered \ - ) \ -) - -static bool compact_prefetch_buffers(void); -static void consume_prefetch_responses(void); -static bool prefetch_read(PrefetchRequest *slot); -static void prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns); -static bool prefetch_wait_for(uint64 ring_index); -static void prefetch_cleanup_trailing_unused(void); -static inline void prefetch_set_unused(uint64 ring_index); -#if PG_MAJORVERSION_NUM < 17 -static void -GetLastWrittenLSNv(NRelFileInfo relfilenode, ForkNumber forknum, - BlockNumber blkno, int nblocks, XLogRecPtr *lsns); -#endif - -static void -neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, - BlockNumber blkno, neon_request_lsns *output, - BlockNumber nblocks, const bits8 *mask); -static bool neon_prefetch_response_usable(neon_request_lsns *request_lsns, - PrefetchRequest *slot); - -static bool -compact_prefetch_buffers(void) -{ - uint64 empty_ring_index = MyPState->ring_last; - uint64 search_ring_index = MyPState->ring_receive; - int n_moved = 0; - - if (MyPState->ring_receive == MyPState->ring_last) - return false; - - while (search_ring_index > MyPState->ring_last) - { - search_ring_index--; - if (GetPrfSlot(search_ring_index)->status == PRFS_UNUSED) - { - empty_ring_index = search_ring_index; - break; - } - } - - /* - * Here we have established: slots < search_ring_index have an unknown - * state (not scanned) slots >= search_ring_index and <= empty_ring_index - * are unused slots > empty_ring_index are in use, or outside our buffer's - * range. ... unless search_ring_index <= ring_last - * - * Therefore, there is a gap of at least one unused items between - * search_ring_index and empty_ring_index (both inclusive), which grows as - * we hit more unused items while moving backwards through the array. - */ - - while (search_ring_index > MyPState->ring_last) - { - PrefetchRequest *source_slot; - PrefetchRequest *target_slot; - bool found; - - /* update search index to an unprocessed entry */ - search_ring_index--; - - source_slot = GetPrfSlot(search_ring_index); - - if (source_slot->status == PRFS_UNUSED) - continue; - - /* slot is used -- start moving slot */ - target_slot = GetPrfSlot(empty_ring_index); - - Assert(source_slot->status == PRFS_RECEIVED); - Assert(target_slot->status == PRFS_UNUSED); - - target_slot->buftag = source_slot->buftag; - target_slot->shard_no = source_slot->shard_no; - target_slot->status = source_slot->status; - target_slot->response = source_slot->response; - target_slot->reqid = source_slot->reqid; - target_slot->request_lsns = source_slot->request_lsns; - target_slot->my_ring_index = empty_ring_index; - - prfh_delete(MyPState->prf_hash, source_slot); - prfh_insert(MyPState->prf_hash, target_slot, &found); - - Assert(!found); - - /* Adjust the location of our known-empty slot */ - empty_ring_index--; - - /* empty the moved slot */ - source_slot->status = PRFS_UNUSED; - source_slot->buftag = (BufferTag) - { - 0 - }; - source_slot->response = NULL; - source_slot->my_ring_index = 0; - source_slot->request_lsns = (neon_request_lsns) { - InvalidXLogRecPtr, InvalidXLogRecPtr, InvalidXLogRecPtr - }; - - /* update bookkeeping */ - n_moved++; - } - - /* - * Only when we've moved slots we can expect trailing unused slots, so - * only then we clean up trailing unused slots. - */ - if (n_moved > 0) - { - prefetch_cleanup_trailing_unused(); - return true; - } - - return false; -} - -/* - * If there might be responses still in the TCP buffer, then - * we should try to use those, so as to reduce any TCP backpressure - * on the OS/PS side. - * - * This procedure handles that. - * - * Note that this is only valid as long as the only pipelined - * operations in the TCP buffer are getPage@Lsn requests. - */ -static void -prefetch_pump_state(void) -{ - while (MyPState->ring_receive != MyPState->ring_flush) - { - NeonResponse *response; - PrefetchRequest *slot; - MemoryContext old; - - slot = GetPrfSlot(MyPState->ring_receive); - - old = MemoryContextSwitchTo(MyPState->errctx); - response = page_server->try_receive(slot->shard_no); - MemoryContextSwitchTo(old); - - if (response == NULL) - break; - - /* The slot should still be valid */ - if (slot->status != PRFS_REQUESTED || - slot->response != NULL || - slot->my_ring_index != MyPState->ring_receive) - neon_shard_log(slot->shard_no, ERROR, - "Incorrect prefetch slot state after receive: status=%d response=%p my=%lu receive=%lu", - slot->status, slot->response, - (long) slot->my_ring_index, (long) MyPState->ring_receive); - - /* update prefetch state */ - MyPState->n_responses_buffered += 1; - MyPState->n_requests_inflight -= 1; - MyPState->ring_receive += 1; - MyNeonCounters->getpage_prefetches_buffered = - MyPState->n_responses_buffered; - - /* update slot state */ - slot->status = PRFS_RECEIVED; - slot->response = response; - } -} - void -readahead_buffer_resize(int newsize, void *extra) +nm_pack_request(StringInfo* s, NeonRequest *msg) { - uint64 end, - nfree = newsize; - PrefetchState *newPState; - Size newprfs_size = offsetof(PrefetchState, prf_buffer) + - (sizeof(PrefetchRequest) * newsize); - - /* don't try to re-initialize if we haven't initialized yet */ - if (MyPState == NULL) - return; - - /* - * Make sure that we don't lose track of active prefetch requests by - * ensuring we have received all but the last n requests (n = newsize). - */ - if (MyPState->n_requests_inflight > newsize) - { - Assert(MyPState->ring_unused >= MyPState->n_requests_inflight - newsize); - prefetch_wait_for(MyPState->ring_unused - (MyPState->n_requests_inflight - newsize)); - Assert(MyPState->n_requests_inflight <= newsize); - } - - /* construct the new PrefetchState, and copy over the memory contexts */ - newPState = MemoryContextAllocZero(TopMemoryContext, newprfs_size); - - newPState->bufctx = MyPState->bufctx; - newPState->errctx = MyPState->errctx; - newPState->hashctx = MyPState->hashctx; - newPState->prf_hash = prfh_create(MyPState->hashctx, newsize, NULL); - newPState->n_unused = newsize; - newPState->n_requests_inflight = 0; - newPState->n_responses_buffered = 0; - newPState->ring_last = newsize; - newPState->ring_unused = newsize; - newPState->ring_receive = newsize; - newPState->max_shard_no = MyPState->max_shard_no; - memcpy(newPState->shard_bitmap, MyPState->shard_bitmap, sizeof(MyPState->shard_bitmap)); - - /* - * Copy over the prefetches. - * - * We populate the prefetch array from the end; to retain the most recent - * prefetches, but this has the benefit of only needing to do one - * iteration on the dataset, and trivial compaction. - */ - for (end = MyPState->ring_unused - 1; - end >= MyPState->ring_last && end != UINT64_MAX && nfree != 0; - end -= 1) - { - PrefetchRequest *slot = GetPrfSlot(end); - PrefetchRequest *newslot; - bool found; - - if (slot->status == PRFS_UNUSED) - continue; - - nfree -= 1; - - newslot = &newPState->prf_buffer[nfree]; - *newslot = *slot; - newslot->my_ring_index = nfree; - - prfh_insert(newPState->prf_hash, newslot, &found); - - Assert(!found); - - switch (newslot->status) - { - case PRFS_UNUSED: - pg_unreachable(); - case PRFS_REQUESTED: - newPState->n_requests_inflight += 1; - newPState->ring_receive -= 1; - newPState->ring_last -= 1; - break; - case PRFS_RECEIVED: - newPState->n_responses_buffered += 1; - newPState->ring_last -= 1; - break; - case PRFS_TAG_REMAINS: - newPState->ring_last -= 1; - break; - } - newPState->n_unused -= 1; - } - newPState->ring_flush = newPState->ring_receive; - - MyNeonCounters->getpage_prefetches_buffered = - MyPState->n_responses_buffered; - MyNeonCounters->pageserver_open_requests = - MyPState->n_requests_inflight; - - for (; end >= MyPState->ring_last && end != UINT64_MAX; end -= 1) - { - PrefetchRequest *slot = GetPrfSlot(end); - Assert(slot->status != PRFS_REQUESTED); - if (slot->status == PRFS_RECEIVED) - { - pfree(slot->response); - } - } - - prfh_destroy(MyPState->prf_hash); - pfree(MyPState); - MyPState = newPState; -} - - - -/* - * Make sure that there are no responses still in the buffer. - * - * NOTE: this function may indirectly update MyPState->pfs_hash; which - * invalidates any active pointers into the hash table. - */ -static void -consume_prefetch_responses(void) -{ - if (MyPState->ring_receive < MyPState->ring_unused) - prefetch_wait_for(MyPState->ring_unused - 1); -} - -static void -prefetch_cleanup_trailing_unused(void) -{ - uint64 ring_index; - PrefetchRequest *slot; - - while (MyPState->ring_last < MyPState->ring_receive) - { - ring_index = MyPState->ring_last; - slot = GetPrfSlot(ring_index); - - if (slot->status == PRFS_UNUSED) - MyPState->ring_last += 1; - else - break; - } -} - - -static bool -prefetch_flush_requests(void) -{ - for (shardno_t shard_no = 0; shard_no < MyPState->max_shard_no; shard_no++) - { - if (BITMAP_ISSET(MyPState->shard_bitmap, shard_no)) - { - if (!page_server->flush(shard_no)) - return false; - BITMAP_CLR(MyPState->shard_bitmap, shard_no); - } - } - MyPState->max_shard_no = 0; - return true; -} - -/* - * Wait for slot of ring_index to have received its response. - * The caller is responsible for making sure the request buffer is flushed. - * - * NOTE: this function may indirectly update MyPState->pfs_hash; which - * invalidates any active pointers into the hash table. - * NOTE: callers should make sure they can handle query cancellations in this - * function's call path. - */ -static bool -prefetch_wait_for(uint64 ring_index) -{ - PrefetchRequest *entry; - - if (MyPState->ring_flush <= ring_index && - MyPState->ring_unused > MyPState->ring_flush) - { - if (!prefetch_flush_requests()) - return false; - MyPState->ring_flush = MyPState->ring_unused; - } - - Assert(MyPState->ring_unused > ring_index); - - while (MyPState->ring_receive <= ring_index) - { - entry = GetPrfSlot(MyPState->ring_receive); - - Assert(entry->status == PRFS_REQUESTED); - if (!prefetch_read(entry)) - return false; - } - return true; -} - -/* - * Read the response of a prefetch request into its slot. - * - * The caller is responsible for making sure that the request for this buffer - * was flushed to the PageServer. - * - * NOTE: this function may indirectly update MyPState->pfs_hash; which - * invalidates any active pointers into the hash table. - * - * NOTE: this does IO, and can get canceled out-of-line. - */ -static bool -prefetch_read(PrefetchRequest *slot) -{ - NeonResponse *response; - MemoryContext old; - BufferTag buftag; - shardno_t shard_no; - uint64 my_ring_index; - - Assert(slot->status == PRFS_REQUESTED); - Assert(slot->response == NULL); - Assert(slot->my_ring_index == MyPState->ring_receive); - - if (slot->status != PRFS_REQUESTED || - slot->response != NULL || - slot->my_ring_index != MyPState->ring_receive) - neon_shard_log(slot->shard_no, ERROR, - "Incorrect prefetch read: status=%d response=%p my=%lu receive=%lu", - slot->status, slot->response, - (long)slot->my_ring_index, (long)MyPState->ring_receive); - - /* - * Copy the request info so that if an error happens and the prefetch - * queue is flushed during the receive call, we can print the original - * values in the error message - */ - buftag = slot->buftag; - shard_no = slot->shard_no; - my_ring_index = slot->my_ring_index; - - old = MemoryContextSwitchTo(MyPState->errctx); - response = (NeonResponse *) page_server->receive(shard_no); - MemoryContextSwitchTo(old); - if (response) - { - /* The slot should still be valid */ - if (slot->status != PRFS_REQUESTED || - slot->response != NULL || - slot->my_ring_index != MyPState->ring_receive) - neon_shard_log(shard_no, ERROR, - "Incorrect prefetch slot state after receive: status=%d response=%p my=%lu receive=%lu", - slot->status, slot->response, - (long) slot->my_ring_index, (long) MyPState->ring_receive); - - /* update prefetch state */ - MyPState->n_responses_buffered += 1; - MyPState->n_requests_inflight -= 1; - MyPState->ring_receive += 1; - MyNeonCounters->getpage_prefetches_buffered = - MyPState->n_responses_buffered; - - /* update slot state */ - slot->status = PRFS_RECEIVED; - slot->response = response; - return true; - } - else - { - /* - * Note: The slot might no longer be valid, if the connection was lost - * and the prefetch queue was flushed during the receive call - */ - neon_shard_log(shard_no, LOG, - "No response from reading prefetch entry %lu: %u/%u/%u.%u block %u. This can be caused by a concurrent disconnect", - (long) my_ring_index, - RelFileInfoFmt(BufTagGetNRelFileInfo(buftag)), - buftag.forkNum, buftag.blockNum); - return false; - } -} - -/* - * Disconnect hook - drop prefetches when the connection drops - * - * If we don't remove the failed prefetches, we'd be serving incorrect - * data to the smgr. - */ -void -prefetch_on_ps_disconnect(void) -{ - MyPState->ring_flush = MyPState->ring_unused; - - while (MyPState->ring_receive < MyPState->ring_unused) - { - PrefetchRequest *slot; - uint64 ring_index = MyPState->ring_receive; - - slot = GetPrfSlot(ring_index); - - Assert(slot->status == PRFS_REQUESTED); - Assert(slot->my_ring_index == ring_index); - - /* - * Drop connection to all shards which have prefetch requests. - * It is not a problem to call disconnect multiple times on the same connection - * because disconnect implementation in libpagestore.c will check if connection - * is alive and do nothing of connection was already dropped. - */ - page_server->disconnect(slot->shard_no); - - /* clean up the request */ - slot->status = PRFS_TAG_REMAINS; - MyPState->n_requests_inflight -= 1; - MyPState->ring_receive += 1; - - prefetch_set_unused(ring_index); - pgBufferUsage.prefetch.expired += 1; - MyNeonCounters->getpage_prefetch_discards_total += 1; - } - - /* - * We can have gone into retry due to network error, so update stats with - * the latest available - */ - MyNeonCounters->pageserver_open_requests = - MyPState->n_requests_inflight; - MyNeonCounters->getpage_prefetches_buffered = - MyPState->n_responses_buffered; -} - -/* - * prefetch_set_unused() - clear a received prefetch slot - * - * The slot at ring_index must be a current member of the ring buffer, - * and may not be in the PRFS_REQUESTED state. - * - * NOTE: this function will update MyPState->pfs_hash; which invalidates any - * active pointers into the hash table. - */ -static inline void -prefetch_set_unused(uint64 ring_index) -{ - PrefetchRequest *slot; - - if (ring_index < MyPState->ring_last) - return; /* Should already be unused */ - - slot = GetPrfSlot(ring_index); - if (slot->status == PRFS_UNUSED) - return; - - Assert(slot->status == PRFS_RECEIVED || slot->status == PRFS_TAG_REMAINS); - - if (slot->status == PRFS_RECEIVED) - { - pfree(slot->response); - slot->response = NULL; - - MyPState->n_responses_buffered -= 1; - MyPState->n_unused += 1; - - MyNeonCounters->getpage_prefetches_buffered = - MyPState->n_responses_buffered; - } - else - { - Assert(slot->response == NULL); - } - - prfh_delete(MyPState->prf_hash, slot); - - /* clear all fields */ - MemSet(slot, 0, sizeof(PrefetchRequest)); - slot->status = PRFS_UNUSED; - - /* run cleanup if we're holding back ring_last */ - if (MyPState->ring_last == ring_index) - prefetch_cleanup_trailing_unused(); - - /* - * ... and try to store the buffered responses more compactly if > 12.5% - * of the buffer is gaps - */ - else if (ReceiveBufferNeedsCompaction()) - compact_prefetch_buffers(); -} - -/* - * Send one prefetch request to the pageserver. To wait for the response, call - * prefetch_wait_for(). - */ -static void -prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns) -{ - bool found; - uint64 mySlotNo PG_USED_FOR_ASSERTS_ONLY = slot->my_ring_index; - - NeonGetPageRequest request = { - .hdr.tag = T_NeonGetPageRequest, - .hdr.reqid = GENERATE_REQUEST_ID(), - /* lsn and not_modified_since are filled in below */ - .rinfo = BufTagGetNRelFileInfo(slot->buftag), - .forknum = slot->buftag.forkNum, - .blkno = slot->buftag.blockNum, - }; - - Assert(mySlotNo == MyPState->ring_unused); - - slot->reqid = request.hdr.reqid; - - if (force_request_lsns) - slot->request_lsns = *force_request_lsns; - else - neon_get_request_lsns(BufTagGetNRelFileInfo(slot->buftag), - slot->buftag.forkNum, slot->buftag.blockNum, - &slot->request_lsns, 1, NULL); - request.hdr.lsn = slot->request_lsns.request_lsn; - request.hdr.not_modified_since = slot->request_lsns.not_modified_since; - - Assert(slot->response == NULL); - Assert(slot->my_ring_index == MyPState->ring_unused); - - while (!page_server->send(slot->shard_no, (NeonRequest *) &request)) - { - Assert(mySlotNo == MyPState->ring_unused); - /* loop */ - } - - /* update prefetch state */ - MyPState->n_requests_inflight += 1; - MyPState->n_unused -= 1; - MyPState->ring_unused += 1; - BITMAP_SET(MyPState->shard_bitmap, slot->shard_no); - MyPState->max_shard_no = Max(slot->shard_no+1, MyPState->max_shard_no); - - /* update slot state */ - slot->status = PRFS_REQUESTED; - prfh_insert(MyPState->prf_hash, slot, &found); - Assert(!found); -} - -/* - * prefetch_register_bufferv() - register and prefetch buffers - * - * Register that we may want the contents of BufferTag in the near future. - * This is used when issuing a speculative prefetch request, but also when - * performing a synchronous request and need the buffer right now. - * - * If force_request_lsns is not NULL, those values are sent to the - * pageserver. If NULL, we utilize the lastWrittenLsn -infrastructure - * to calculate the LSNs to send. - * - * When performing a prefetch rather than a synchronous request, - * is_prefetch==true. Currently, it only affects how the request is accounted - * in the perf counters. - * - * NOTE: this function may indirectly update MyPState->pfs_hash; which - * invalidates any active pointers into the hash table. - */ -static uint64 -prefetch_register_bufferv(BufferTag tag, neon_request_lsns *frlsns, - BlockNumber nblocks, const bits8 *mask, - bool is_prefetch) -{ - uint64 min_ring_index; - PrefetchRequest hashkey; -#ifdef USE_ASSERT_CHECKING - bool any_hits = false; -#endif - /* We will never read further ahead than our buffer can store. */ - nblocks = Max(1, Min(nblocks, readahead_buffer_size)); - - /* - * Use an intermediate PrefetchRequest struct as the hash key to ensure - * correct alignment and that the padding bytes are cleared. - */ - memset(&hashkey.buftag, 0, sizeof(BufferTag)); - hashkey.buftag = tag; - -Retry: - /* - * We can have gone into retry due to network error, so update stats with - * the latest available - */ - MyNeonCounters->pageserver_open_requests = - MyPState->ring_unused - MyPState->ring_receive; - MyNeonCounters->getpage_prefetches_buffered = - MyPState->n_responses_buffered; - - min_ring_index = UINT64_MAX; - for (int i = 0; i < nblocks; i++) - { - PrefetchRequest *slot = NULL; - PrfHashEntry *entry = NULL; - uint64 ring_index; - neon_request_lsns *lsns; - - if (PointerIsValid(mask) && !BITMAP_ISSET(mask, i)) - continue; - - if (frlsns) - lsns = &frlsns[i]; - else - lsns = NULL; - -#ifdef USE_ASSERT_CHECKING - any_hits = true; -#endif - - slot = NULL; - entry = NULL; - - hashkey.buftag.blockNum = tag.blockNum + i; - entry = prfh_lookup(MyPState->prf_hash, &hashkey); - - if (entry != NULL) - { - slot = entry->slot; - ring_index = slot->my_ring_index; - Assert(slot == GetPrfSlot(ring_index)); - - Assert(slot->status != PRFS_UNUSED); - Assert(MyPState->ring_last <= ring_index && - ring_index < MyPState->ring_unused); - Assert(BufferTagsEqual(&slot->buftag, &hashkey.buftag)); - - /* - * If the caller specified a request LSN to use, only accept - * prefetch responses that satisfy that request. - */ - if (lsns) - { - if (!neon_prefetch_response_usable(lsns, slot)) - { - /* Wait for the old request to finish and discard it */ - if (!prefetch_wait_for(ring_index)) - goto Retry; - prefetch_set_unused(ring_index); - entry = NULL; - slot = NULL; - pgBufferUsage.prefetch.expired += 1; - MyNeonCounters->getpage_prefetch_discards_total += 1; - } - } - - if (entry != NULL) - { - /* - * We received a prefetch for a page that was recently read - * and removed from the buffers. Remove that request from the - * buffers. - */ - if (slot->status == PRFS_TAG_REMAINS) - { - prefetch_set_unused(ring_index); - entry = NULL; - slot = NULL; - } - else - { - min_ring_index = Min(min_ring_index, ring_index); - /* The buffered request is good enough, return that index */ - if (is_prefetch) - pgBufferUsage.prefetch.duplicates++; - else - pgBufferUsage.prefetch.hits++; - continue; - } - } - } - else if (!is_prefetch) - { - pgBufferUsage.prefetch.misses += 1; - MyNeonCounters->getpage_prefetch_misses_total++; - } - /* - * We can only leave the block above by finding that there's - * no entry that can satisfy this request, either because there - * was no entry, or because the entry was invalid or didn't satisfy - * the LSNs provided. - * - * The code should've made sure to clear up the data. - */ - Assert(entry == NULL); - Assert(slot == NULL); - - /* There should be no buffer overflow */ - Assert(MyPState->ring_last + readahead_buffer_size >= MyPState->ring_unused); - - /* - * If the prefetch queue is full, we need to make room by clearing the - * oldest slot. If the oldest slot holds a buffer that was already - * received, we can just throw it away; we fetched the page - * unnecessarily in that case. If the oldest slot holds a request that - * we haven't received a response for yet, we have to wait for the - * response to that before we can continue. We might not have even - * flushed the request to the pageserver yet, it might be just sitting - * in the output buffer. In that case, we flush it and wait for the - * response. (We could decide not to send it, but it's hard to abort - * when the request is already in the output buffer, and 'not sending' - * a prefetch request kind of goes against the principles of - * prefetching) - */ - if (MyPState->ring_last + readahead_buffer_size == MyPState->ring_unused) - { - uint64 cleanup_index = MyPState->ring_last; - - slot = GetPrfSlot(cleanup_index); - - Assert(slot->status != PRFS_UNUSED); - - /* - * If there is good reason to run compaction on the prefetch buffers, - * try to do that. - */ - if (ReceiveBufferNeedsCompaction() && compact_prefetch_buffers()) - { - Assert(slot->status == PRFS_UNUSED); - } - else - { - /* - * We have the slot for ring_last, so that must still be in - * progress - */ - switch (slot->status) - { - case PRFS_REQUESTED: - Assert(MyPState->ring_receive == cleanup_index); - if (!prefetch_wait_for(cleanup_index)) - goto Retry; - prefetch_set_unused(cleanup_index); - pgBufferUsage.prefetch.expired += 1; - MyNeonCounters->getpage_prefetch_discards_total += 1; - break; - case PRFS_RECEIVED: - case PRFS_TAG_REMAINS: - prefetch_set_unused(cleanup_index); - pgBufferUsage.prefetch.expired += 1; - MyNeonCounters->getpage_prefetch_discards_total += 1; - break; - default: - pg_unreachable(); - } - } - } - - /* - * The next buffer pointed to by `ring_unused` is now definitely empty, so - * we can insert the new request to it. - */ - ring_index = MyPState->ring_unused; - - Assert(MyPState->ring_last <= ring_index && - ring_index <= MyPState->ring_unused); - - slot = GetPrfSlotNoCheck(ring_index); - - Assert(slot->status == PRFS_UNUSED); - - /* - * We must update the slot data before insertion, because the hash - * function reads the buffer tag from the slot. - */ - slot->buftag = hashkey.buftag; - slot->shard_no = get_shard_number(&tag); - slot->my_ring_index = ring_index; - - min_ring_index = Min(min_ring_index, ring_index); - - if (is_prefetch) - MyNeonCounters->getpage_prefetch_requests_total++; - else - MyNeonCounters->getpage_sync_requests_total++; - - prefetch_do_request(slot, lsns); - } - - MyNeonCounters->pageserver_open_requests = - MyPState->ring_unused - MyPState->ring_receive; - - Assert(any_hits); - - Assert(GetPrfSlot(min_ring_index)->status == PRFS_REQUESTED || - GetPrfSlot(min_ring_index)->status == PRFS_RECEIVED); - Assert(MyPState->ring_last <= min_ring_index && - min_ring_index < MyPState->ring_unused); - - if (flush_every_n_requests > 0 && - MyPState->ring_unused - MyPState->ring_flush >= flush_every_n_requests) - { - if (!prefetch_flush_requests()) - { - /* - * Prefetch set is reset in case of error, so we should try to - * register our request once again - */ - goto Retry; - } - MyPState->ring_flush = MyPState->ring_unused; - } - - return min_ring_index; -} - -static bool -equal_requests(NeonRequest* a, NeonRequest* b) -{ - return a->reqid == b->reqid && a->lsn == b->lsn && a->not_modified_since == b->not_modified_since; -} - - -/* - * Note: this function can get canceled and use a long jump to the next catch - * context. Take care. - */ -static NeonResponse * -page_server_request(void const *req) -{ - NeonResponse *resp; - BufferTag tag = {0}; - shardno_t shard_no; - - switch (messageTag(req)) - { - case T_NeonExistsRequest: - CopyNRelFileInfoToBufTag(tag, ((NeonExistsRequest *) req)->rinfo); - break; - case T_NeonNblocksRequest: - CopyNRelFileInfoToBufTag(tag, ((NeonNblocksRequest *) req)->rinfo); - break; - case T_NeonDbSizeRequest: - NInfoGetDbOid(BufTagGetNRelFileInfo(tag)) = ((NeonDbSizeRequest *) req)->dbNode; - break; - case T_NeonGetPageRequest: - CopyNRelFileInfoToBufTag(tag, ((NeonGetPageRequest *) req)->rinfo); - tag.blockNum = ((NeonGetPageRequest *) req)->blkno; - break; - default: - neon_log(ERROR, "Unexpected request tag: %d", messageTag(req)); - } - shard_no = get_shard_number(&tag); - - /* - * Current sharding model assumes that all metadata is present only at shard 0. - * We still need to call get_shard_no() to check if shard map is up-to-date. - */ - if (((NeonRequest *) req)->tag != T_NeonGetPageRequest) - { - shard_no = 0; - } - - do - { - PG_TRY(); - { - while (!page_server->send(shard_no, (NeonRequest *) req) - || !page_server->flush(shard_no)) - { - /* do nothing */ - } - MyNeonCounters->pageserver_open_requests++; - consume_prefetch_responses(); - resp = page_server->receive(shard_no); - MyNeonCounters->pageserver_open_requests--; - } - PG_CATCH(); - { - /* - * Cancellation in this code needs to be handled better at some - * point, but this currently seems fine for now. - */ - page_server->disconnect(shard_no); - MyNeonCounters->pageserver_open_requests = 0; - - PG_RE_THROW(); - } - PG_END_TRY(); - - } while (resp == NULL); - - return resp; -} - - -StringInfoData -nm_pack_request(NeonRequest *msg) -{ - StringInfoData s; - - initStringInfo(&s); - - pq_sendbyte(&s, msg->tag); - if (neon_protocol_version >= 3) - { - pq_sendint64(&s, msg->reqid); - } - pq_sendint64(&s, msg->lsn); - pq_sendint64(&s, msg->not_modified_since); + resetStringInfo(s); + pq_sendbyte(s, msg->tag); + pq_sendint64(s, msg->u.reqid); + pq_sendint64(s, msg->lsn); + pq_sendint64(s, msg->not_modified_since); switch (messageTag(msg)) { @@ -1257,10 +134,10 @@ nm_pack_request(NeonRequest *msg) { NeonExistsRequest *msg_req = (NeonExistsRequest *) msg; - pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo)); - pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo)); - pq_sendint32(&s, NInfoGetRelNumber(msg_req->rinfo)); - pq_sendbyte(&s, msg_req->forknum); + pq_sendint32(s, NInfoGetSpcOid(msg_req->rinfo)); + pq_sendint32(s, NInfoGetDbOid(msg_req->rinfo)); + pq_sendint32(s, NInfoGetRelNumber(msg_req->rinfo)); + pq_sendbyte(s, msg_req->forknum); break; } @@ -1268,10 +145,10 @@ nm_pack_request(NeonRequest *msg) { NeonNblocksRequest *msg_req = (NeonNblocksRequest *) msg; - pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo)); - pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo)); - pq_sendint32(&s, NInfoGetRelNumber(msg_req->rinfo)); - pq_sendbyte(&s, msg_req->forknum); + pq_sendint32(s, NInfoGetSpcOid(msg_req->rinfo)); + pq_sendint32(s, NInfoGetDbOid(msg_req->rinfo)); + pq_sendint32(s, NInfoGetRelNumber(msg_req->rinfo)); + pq_sendbyte(s, msg_req->forknum); break; } @@ -1279,7 +156,7 @@ nm_pack_request(NeonRequest *msg) { NeonDbSizeRequest *msg_req = (NeonDbSizeRequest *) msg; - pq_sendint32(&s, msg_req->dbNode); + pq_sendint32(s, msg_req->dbNode); break; } @@ -1287,11 +164,11 @@ nm_pack_request(NeonRequest *msg) { NeonGetPageRequest *msg_req = (NeonGetPageRequest *) msg; - pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo)); - pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo)); - pq_sendint32(&s, NInfoGetRelNumber(msg_req->rinfo)); - pq_sendbyte(&s, msg_req->forknum); - pq_sendint32(&s, msg_req->blkno); + pq_sendint32(s, NInfoGetSpcOid(msg_req->rinfo)); + pq_sendint32(s, NInfoGetDbOid(msg_req->rinfo)); + pq_sendint32(s, NInfoGetRelNumber(msg_req->rinfo)); + pq_sendbyte(s, msg_req->forknum); + pq_sendint32(s, msg_req->blkno); break; } @@ -1300,8 +177,8 @@ nm_pack_request(NeonRequest *msg) { NeonGetSlruSegmentRequest *msg_req = (NeonGetSlruSegmentRequest *) msg; - pq_sendbyte(&s, msg_req->kind); - pq_sendint32(&s, msg_req->segno); + pq_sendbyte(s, msg_req->kind); + pq_sendint32(s, msg_req->segno); break; } @@ -1320,6 +197,15 @@ nm_pack_request(NeonRequest *msg) return s; } +/* + * We can not used palloc because it is not thread safe + &*/ +void* +static memalloc(size_t size) +{ + return calloc(size, 1); +} + NeonResponse * nm_unpack_response(StringInfo s) { @@ -1328,26 +214,20 @@ nm_unpack_response(StringInfo s) NeonResponse *resp = NULL; resp_hdr.tag = tag; - if (neon_protocol_version >= 3) - { - resp_hdr.reqid = pq_getmsgint64(s); - resp_hdr.lsn = pq_getmsgint64(s); - resp_hdr.not_modified_since = pq_getmsgint64(s); - } + resp_hdr.u.reqid = pq_getmsgint64(s); + resp_hdr.lsn = pq_getmsgint64(s); + resp_hdr.not_modified_since = pq_getmsgint64(s); switch (tag) { /* pagestore -> pagestore_client */ case T_NeonExistsResponse: { - NeonExistsResponse *msg_resp = palloc0(sizeof(NeonExistsResponse)); + NeonExistsResponse *msg_resp = memalloc(sizeof(NeonExistsResponse)); - if (neon_protocol_version >= 3) - { - NInfoGetSpcOid(msg_resp->req.rinfo) = pq_getmsgint(s, 4); - NInfoGetDbOid(msg_resp->req.rinfo) = pq_getmsgint(s, 4); - NInfoGetRelNumber(msg_resp->req.rinfo) = pq_getmsgint(s, 4); - msg_resp->req.forknum = pq_getmsgbyte(s); - } + NInfoGetSpcOid(msg_resp->req.rinfo) = pq_getmsgint(s, 4); + NInfoGetDbOid(msg_resp->req.rinfo) = pq_getmsgint(s, 4); + NInfoGetRelNumber(msg_resp->req.rinfo) = pq_getmsgint(s, 4); + msg_resp->req.forknum = pq_getmsgbyte(s); msg_resp->req.hdr = resp_hdr; msg_resp->exists = pq_getmsgbyte(s); pq_getmsgend(s); @@ -1358,15 +238,12 @@ nm_unpack_response(StringInfo s) case T_NeonNblocksResponse: { - NeonNblocksResponse *msg_resp = palloc0(sizeof(NeonNblocksResponse)); + NeonNblocksResponse *msg_resp = memalloc(sizeof(NeonNblocksResponse)); - if (neon_protocol_version >= 3) - { - NInfoGetSpcOid(msg_resp->req.rinfo) = pq_getmsgint(s, 4); - NInfoGetDbOid(msg_resp->req.rinfo) = pq_getmsgint(s, 4); - NInfoGetRelNumber(msg_resp->req.rinfo) = pq_getmsgint(s, 4); - msg_resp->req.forknum = pq_getmsgbyte(s); - } + NInfoGetSpcOid(msg_resp->req.rinfo) = pq_getmsgint(s, 4); + NInfoGetDbOid(msg_resp->req.rinfo) = pq_getmsgint(s, 4); + NInfoGetRelNumber(msg_resp->req.rinfo) = pq_getmsgint(s, 4); + msg_resp->req.forknum = pq_getmsgbyte(s); msg_resp->req.hdr = resp_hdr; msg_resp->n_blocks = pq_getmsgint(s, 4); pq_getmsgend(s); @@ -1379,15 +256,12 @@ nm_unpack_response(StringInfo s) { NeonGetPageResponse *msg_resp; - msg_resp = MemoryContextAllocZero(MyPState->bufctx, PS_GETPAGERESPONSE_SIZE); - if (neon_protocol_version >= 3) - { - NInfoGetSpcOid(msg_resp->req.rinfo) = pq_getmsgint(s, 4); - NInfoGetDbOid(msg_resp->req.rinfo) = pq_getmsgint(s, 4); - NInfoGetRelNumber(msg_resp->req.rinfo) = pq_getmsgint(s, 4); - msg_resp->req.forknum = pq_getmsgbyte(s); - msg_resp->req.blkno = pq_getmsgint(s, 4); - } + msg_resp = memalloc(PS_GETPAGERESPONSE_SIZE); + NInfoGetSpcOid(msg_resp->req.rinfo) = pq_getmsgint(s, 4); + NInfoGetDbOid(msg_resp->req.rinfo) = pq_getmsgint(s, 4); + NInfoGetRelNumber(msg_resp->req.rinfo) = pq_getmsgint(s, 4); + msg_resp->req.forknum = pq_getmsgbyte(s); + msg_resp->req.blkno = pq_getmsgint(s, 4); msg_resp->req.hdr = resp_hdr; /* XXX: should be varlena */ memcpy(msg_resp->page, pq_getmsgbytes(s, BLCKSZ), BLCKSZ); @@ -1401,12 +275,9 @@ nm_unpack_response(StringInfo s) case T_NeonDbSizeResponse: { - NeonDbSizeResponse *msg_resp = palloc0(sizeof(NeonDbSizeResponse)); + NeonDbSizeResponse *msg_resp = memalloc(sizeof(NeonDbSizeResponse)); - if (neon_protocol_version >= 3) - { - msg_resp->req.dbNode = pq_getmsgint(s, 4); - } + msg_resp->req.dbNode = pq_getmsgint(s, 4); msg_resp->req.hdr = resp_hdr; msg_resp->db_size = pq_getmsgint64(s); pq_getmsgend(s); @@ -1424,7 +295,7 @@ nm_unpack_response(StringInfo s) msgtext = pq_getmsgrawstring(s); msglen = strlen(msgtext); - msg_resp = palloc0(sizeof(NeonErrorResponse) + msglen + 1); + msg_resp = memalloc(sizeof(NeonErrorResponse) + msglen + 1); msg_resp->req = resp_hdr; memcpy(msg_resp->message, msgtext, msglen + 1); pq_getmsgend(s); @@ -1437,7 +308,7 @@ nm_unpack_response(StringInfo s) { NeonGetSlruSegmentResponse *msg_resp; int n_blocks; - msg_resp = palloc0(sizeof(NeonGetSlruSegmentResponse)); + msg_resp = memalloc(sizeof(NeonGetSlruSegmentResponse)); if (neon_protocol_version >= 3) { @@ -1473,152 +344,6 @@ nm_unpack_response(StringInfo s) return resp; } -/* dump to json for debugging / error reporting purposes */ -char * -nm_to_string(NeonMessage *msg) -{ - StringInfoData s; - - initStringInfo(&s); - - switch (messageTag(msg)) - { - /* pagestore_client -> pagestore */ - case T_NeonExistsRequest: - { - NeonExistsRequest *msg_req = (NeonExistsRequest *) msg; - - appendStringInfoString(&s, "{\"type\": \"NeonExistsRequest\""); - appendStringInfo(&s, ", \"rinfo\": \"%u/%u/%u\"", RelFileInfoFmt(msg_req->rinfo)); - appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum); - appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->hdr.lsn)); - appendStringInfo(&s, ", \"not_modified_since\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->hdr.not_modified_since)); - appendStringInfoChar(&s, '}'); - break; - } - - case T_NeonNblocksRequest: - { - NeonNblocksRequest *msg_req = (NeonNblocksRequest *) msg; - - appendStringInfoString(&s, "{\"type\": \"NeonNblocksRequest\""); - appendStringInfo(&s, ", \"rinfo\": \"%u/%u/%u\"", RelFileInfoFmt(msg_req->rinfo)); - appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum); - appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->hdr.lsn)); - appendStringInfo(&s, ", \"not_modified_since\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->hdr.not_modified_since)); - appendStringInfoChar(&s, '}'); - break; - } - - case T_NeonGetPageRequest: - { - NeonGetPageRequest *msg_req = (NeonGetPageRequest *) msg; - - appendStringInfoString(&s, "{\"type\": \"NeonGetPageRequest\""); - appendStringInfo(&s, ", \"rinfo\": \"%u/%u/%u\"", RelFileInfoFmt(msg_req->rinfo)); - appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum); - appendStringInfo(&s, ", \"blkno\": %u", msg_req->blkno); - appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->hdr.lsn)); - appendStringInfo(&s, ", \"not_modified_since\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->hdr.not_modified_since)); - appendStringInfoChar(&s, '}'); - break; - } - case T_NeonDbSizeRequest: - { - NeonDbSizeRequest *msg_req = (NeonDbSizeRequest *) msg; - - appendStringInfoString(&s, "{\"type\": \"NeonDbSizeRequest\""); - appendStringInfo(&s, ", \"dbnode\": \"%u\"", msg_req->dbNode); - appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->hdr.lsn)); - appendStringInfo(&s, ", \"not_modified_since\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->hdr.not_modified_since)); - appendStringInfoChar(&s, '}'); - break; - } - case T_NeonGetSlruSegmentRequest: - { - NeonGetSlruSegmentRequest *msg_req = (NeonGetSlruSegmentRequest *) msg; - - appendStringInfoString(&s, "{\"type\": \"NeonGetSlruSegmentRequest\""); - appendStringInfo(&s, ", \"kind\": %u", msg_req->kind); - appendStringInfo(&s, ", \"segno\": %u", msg_req->segno); - appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->hdr.lsn)); - appendStringInfo(&s, ", \"not_modified_since\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->hdr.not_modified_since)); - appendStringInfoChar(&s, '}'); - break; - } - /* pagestore -> pagestore_client */ - case T_NeonExistsResponse: - { - NeonExistsResponse *msg_resp = (NeonExistsResponse *) msg; - - appendStringInfoString(&s, "{\"type\": \"NeonExistsResponse\""); - appendStringInfo(&s, ", \"exists\": %d}", - msg_resp->exists); - appendStringInfoChar(&s, '}'); - - break; - } - case T_NeonNblocksResponse: - { - NeonNblocksResponse *msg_resp = (NeonNblocksResponse *) msg; - - appendStringInfoString(&s, "{\"type\": \"NeonNblocksResponse\""); - appendStringInfo(&s, ", \"n_blocks\": %u}", - msg_resp->n_blocks); - appendStringInfoChar(&s, '}'); - - break; - } - case T_NeonGetPageResponse: - { -#if 0 - NeonGetPageResponse *msg_resp = (NeonGetPageResponse *) msg; -#endif - - appendStringInfoString(&s, "{\"type\": \"NeonGetPageResponse\""); - appendStringInfo(&s, ", \"page\": \"XXX\"}"); - appendStringInfoChar(&s, '}'); - break; - } - case T_NeonErrorResponse: - { - NeonErrorResponse *msg_resp = (NeonErrorResponse *) msg; - - /* FIXME: escape double-quotes in the message */ - appendStringInfoString(&s, "{\"type\": \"NeonErrorResponse\""); - appendStringInfo(&s, ", \"message\": \"%s\"}", msg_resp->message); - appendStringInfoChar(&s, '}'); - break; - } - case T_NeonDbSizeResponse: - { - NeonDbSizeResponse *msg_resp = (NeonDbSizeResponse *) msg; - - appendStringInfoString(&s, "{\"type\": \"NeonDbSizeResponse\""); - appendStringInfo(&s, ", \"db_size\": %ld}", - msg_resp->db_size); - appendStringInfoChar(&s, '}'); - - break; - } - case T_NeonGetSlruSegmentResponse: - { - NeonGetSlruSegmentResponse *msg_resp = (NeonGetSlruSegmentResponse *) msg; - - appendStringInfoString(&s, "{\"type\": \"NeonGetSlruSegmentResponse\""); - appendStringInfo(&s, ", \"n_blocks\": %u}", - msg_resp->n_blocks); - appendStringInfoChar(&s, '}'); - - break; - } - - default: - appendStringInfo(&s, "{\"type\": \"unknown 0x%02x\"", msg->tag); - } - return s.data; -} - /* * Wrapper around log_newpage() that makes a temporary copy of the block and * WAL-logs that. This makes it safe to use while holding only a shared lock @@ -1984,27 +709,6 @@ neon_init(void) elog(ERROR, "MyNeonCounters points past end of array"); #endif - prfs_size = offsetof(PrefetchState, prf_buffer) + - sizeof(PrefetchRequest) * readahead_buffer_size; - - MyPState = MemoryContextAllocZero(TopMemoryContext, prfs_size); - - MyPState->n_unused = readahead_buffer_size; - - MyPState->bufctx = SlabContextCreate(TopMemoryContext, - "NeonSMGR/prefetch", - SLAB_DEFAULT_BLOCK_SIZE * 17, - PS_GETPAGERESPONSE_SIZE); - MyPState->errctx = AllocSetContextCreate(TopMemoryContext, - "NeonSMGR/errors", - ALLOCSET_DEFAULT_SIZES); - MyPState->hashctx = AllocSetContextCreate(TopMemoryContext, - "NeonSMGR/prefetch", - ALLOCSET_DEFAULT_SIZES); - - MyPState->prf_hash = prfh_create(MyPState->hashctx, - readahead_buffer_size, NULL); - old_redo_read_buffer_filter = redo_read_buffer_filter; redo_read_buffer_filter = neon_redo_read_buffer_filter; @@ -2248,116 +952,12 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, } } -/* - * neon_prefetch_response_usable -- Can a new request be satisfied by old one? - * - * This is used to check if the response to a prefetch request can be used to - * satisfy a page read now. - */ -static bool -neon_prefetch_response_usable(neon_request_lsns *request_lsns, - PrefetchRequest *slot) -{ - /* sanity check the LSN's on the old and the new request */ - Assert(request_lsns->request_lsn >= request_lsns->not_modified_since); - Assert(request_lsns->effective_request_lsn >= request_lsns->not_modified_since); - Assert(request_lsns->effective_request_lsn <= request_lsns->request_lsn); - Assert(slot->request_lsns.request_lsn >= slot->request_lsns.not_modified_since); - Assert(slot->request_lsns.effective_request_lsn >= slot->request_lsns.not_modified_since); - Assert(slot->request_lsns.effective_request_lsn <= slot->request_lsns.request_lsn); - Assert(slot->status != PRFS_UNUSED); - - /* - * The new request's LSN should never be older than the old one. This - * could be an Assert, except that for testing purposes, we do provide an - * interface in neon_test_utils to fetch pages at arbitary LSNs, which - * violates this. - * - * Similarly, the not_modified_since value calculated for a page should - * never move backwards. This assumption is a bit fragile; if we updated - * the last-written cache when we read in a page, for example, then it - * might. But as the code stands, it should not. - * - * (If two backends issue a request at the same time, they might race and - * calculate LSNs "out of order" with each other, but the prefetch queue - * is backend-private at the moment.) - */ - if (request_lsns->effective_request_lsn < slot->request_lsns.effective_request_lsn || - request_lsns->not_modified_since < slot->request_lsns.not_modified_since) - { - ereport(LOG, - (errcode(ERRCODE_IO_ERROR), - errmsg(NEON_TAG "request with unexpected LSN after prefetch"), - errdetail("Request %X/%X not_modified_since %X/%X, prefetch %X/%X not_modified_since %X/%X)", - LSN_FORMAT_ARGS(request_lsns->effective_request_lsn), - LSN_FORMAT_ARGS(request_lsns->not_modified_since), - LSN_FORMAT_ARGS(slot->request_lsns.effective_request_lsn), - LSN_FORMAT_ARGS(slot->request_lsns.not_modified_since)))); - return false; - } - - /*--- - * Each request to the pageserver has three LSN values associated with it: - * `not_modified_since`, `request_lsn`, and 'effective_request_lsn'. - * `not_modified_since` and `request_lsn` are sent to the pageserver, but - * in the primary node, we always use UINT64_MAX as the `request_lsn`, so - * we remember `effective_request_lsn` separately. In a primary, - * `effective_request_lsn` is the last flush WAL position when the request - * was sent to the pageserver. That's logically the LSN that we are - * requesting the page at, but we send UINT64_MAX to the pageserver so - * that if the GC horizon advances past that position, we still get a - * valid response instead of an error. - * - * To determine whether a response to a GetPage request issued earlier is - * still valid to satisfy a new page read, we look at the - * (not_modified_since, effective_request_lsn] range of the request. It is - * effectively a claim that the page has not been modified between those - * LSNs. If the range of the old request in the queue overlaps with the - * new request, we know that the page hasn't been modified in the union of - * the ranges. We can use the response to old request to satisfy the new - * request in that case. For example: - * - * 100 500 - * Old request: +--------+ - * - * 400 800 - * New request: +--------+ - * - * The old request claims that the page was not modified between LSNs 100 - * and 500, and the second claims that it was not modified between 400 and - * 800. Together they mean that the page was not modified between 100 and - * 800. Therefore the response to the old request is also valid for the - * new request. - * - * This logic also holds at the boundary case that the old request's LSN - * matches the new request's not_modified_since LSN exactly: - * - * 100 500 - * Old request: +--------+ - * - * 500 900 - * New request: +--------+ - * - * The response to the old request is the page as it was at LSN 500, and - * the page hasn't been changed in the range (500, 900], therefore the - * response is valid also for the new request. - */ - - /* this follows from the checks above */ - Assert(request_lsns->effective_request_lsn >= slot->request_lsns.not_modified_since); - - return request_lsns->not_modified_since <= slot->request_lsns.effective_request_lsn; -} - /* * neon_exists() -- Does the physical file exist? */ static bool neon_exists(SMgrRelation reln, ForkNumber forkNum) { - bool exists; - NeonResponse *resp; - BlockNumber n_blocks; neon_request_lsns request_lsns; switch (reln->smgr_relpersistence) @@ -2416,66 +1016,16 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum) neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum, REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1, NULL); { - NeonExistsRequest request = { - .hdr.tag = T_NeonExistsRequest, - .hdr.reqid = GENERATE_REQUEST_ID(), + NeonCommunicatorRequest request = { + .hdr.tag = T_NeonNblocksRequest, + .hdr.bufid = InvalidBuffer, .hdr.lsn = request_lsns.request_lsn, .hdr.not_modified_since = request_lsns.not_modified_since, - .rinfo = InfoFromSMgrRel(reln), - .forknum = forkNum + .exists.rinfo = InfoFromSMgrRel(reln), + .exists.forknum = forknum, }; - - resp = page_server_request(&request); - - switch (resp->tag) - { - case T_NeonExistsResponse: - { - NeonExistsResponse* exists_resp = (NeonExistsResponse *) resp; - if (neon_protocol_version >= 3) - { - if (!equal_requests(resp, &request.hdr) || - !RelFileInfoEquals(exists_resp->req.rinfo, request.rinfo) || - exists_resp->req.forknum != request.forknum) - { - NEON_PANIC_CONNECTION_STATE(-1, PANIC, - "Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u} to exits request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u}", - resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(exists_resp->req.rinfo), exists_resp->req.forknum, - request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since), RelFileInfoFmt(request.rinfo), request.forknum); - } - } - exists = exists_resp->exists; - break; - } - case T_NeonErrorResponse: - if (neon_protocol_version >= 3) - { - if (!equal_requests(resp, &request.hdr)) - { - elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match exists request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}", - resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), - request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since)); - } - } - ereport(ERROR, - (errcode(ERRCODE_IO_ERROR), - errmsg(NEON_TAG "[reqid %lx] could not read relation existence of rel %u/%u/%u.%u from page server at lsn %X/%08X", - resp->reqid, - RelFileInfoFmt(InfoFromSMgrRel(reln)), - forkNum, - LSN_FORMAT_ARGS(request_lsns.effective_request_lsn)), - errdetail("page server returned error: %s", - ((NeonErrorResponse *) resp)->message))); - break; - - default: - NEON_PANIC_CONNECTION_STATE(-1, PANIC, - "Expected Exists (0x%02x) or Error (0x%02x) response to ExistsRequest, but got 0x%02x", - T_NeonExistsResponse, T_NeonErrorResponse, resp->tag); - } - pfree(resp); + return communicator_request(0, &request) != 0; } - return exists; } /* @@ -2808,7 +1358,6 @@ static bool neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, int nblocks) { - uint64 ring_index PG_USED_FOR_ASSERTS_ONLY; BufferTag tag; switch (reln->smgr_relpersistence) @@ -2825,41 +1374,22 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); } - tag.spcOid = reln->smgr_rlocator.locator.spcOid; - tag.dbOid = reln->smgr_rlocator.locator.dbOid; - tag.relNumber = reln->smgr_rlocator.locator.relNumber; - tag.forkNum = forknum; - - while (nblocks > 0) + for (int i = 0; i < nblocks; i++) { - int iterblocks = Min(nblocks, PG_IOV_MAX); - bits8 lfc_present[PG_IOV_MAX / 8]; - memset(lfc_present, 0, sizeof(lfc_present)); - - if (lfc_cache_containsv(InfoFromSMgrRel(reln), forknum, blocknum, - iterblocks, lfc_present) == iterblocks) + if (!lfc_cache_contains(InfoFromSMgrRel(reln), forknum, blocknum + i)) { - nblocks -= iterblocks; - blocknum += iterblocks; - continue; + NeonCommunicatorRequest request = { + .hdr.tag = T_NeonGetPageRequest, + .hdr.bufid = InvalidBuffer, + .hdr.lsn = request_lsns.request_lsn, + .hdr.not_modified_since = request_lsns.not_modified_since, + .page.rinfo = InfoFromSMgrRel(reln), + .page.forknum = forknum, + .page.blocknum = blocknum + i, + }; + communicator_send_request(get_dhard_number(InfoFromSMgrRel(reln), blocknum + i), &request); } - - tag.blockNum = blocknum; - - for (int i = 0; i < PG_IOV_MAX / 8; i++) - lfc_present[i] = ~(lfc_present[i]); - - ring_index = prefetch_register_bufferv(tag, NULL, iterblocks, - lfc_present, true); - nblocks -= iterblocks; - blocknum += iterblocks; - - Assert(ring_index < MyPState->ring_unused && - MyPState->ring_last <= ring_index); } - - prefetch_pump_state(); - return false; } @@ -2871,7 +1401,6 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, static bool neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum) { - uint64 ring_index PG_USED_FOR_ASSERTS_ONLY; BufferTag tag; switch (reln->smgr_relpersistence) @@ -2888,21 +1417,19 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum) neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); } - if (lfc_cache_contains(InfoFromSMgrRel(reln), forknum, blocknum)) - return false; - - tag.forkNum = forknum; - tag.blockNum = blocknum; - - CopyNRelFileInfoToBufTag(tag, InfoFromSMgrRel(reln)); - - ring_index = prefetch_register_bufferv(tag, NULL, 1, NULL, true); - - Assert(ring_index < MyPState->ring_unused && - MyPState->ring_last <= ring_index); - - prefetch_pump_state(); - + if (!lfc_cache_contains(InfoFromSMgrRel(reln), forknum, blocknum)) + { + NeonCommunicatorRequest request = { + .hdr.tag = T_NeonGetPageRequest, + .hdr.bufid = InvalidBuffer, + .hdr.lsn = request_lsns.request_lsn, + .hdr.not_modified_since = request_lsns.not_modified_since, + .page.rinfo = InfoFromSMgrRel(reln), + .page.forknum = forknum, + .page.blocknum = blocknum + i, + }; + communicator_send_request(get_dhard_number(InfoFromSMgrRel(reln), blocknum), &request); + } return false; } #endif /* PG_MAJORVERSION_NUM < 17 */ @@ -2953,194 +1480,35 @@ neon_writeback(SMgrRelation reln, ForkNumber forknum, #endif } +static Buffer +GetBufferId(void const* ptr) +{ + Buffer buf = (size_t)(ptr - BufferBlocks)/BLCKSZ; + Assert(buf < NBuffers); + return buf+1; +} + static void #if PG_MAJORVERSION_NUM < 16 -neon_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber base_blockno, neon_request_lsns *request_lsns, +neon_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber base_blockno, neon_request_lsns *request_lsns, char **buffers, BlockNumber nblocks, const bits8 *mask) #else -neon_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber base_blockno, neon_request_lsns *request_lsns, +neon_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blocknum, neon_request_lsns *request_lsns, void **buffers, BlockNumber nblocks, const bits8 *mask) #endif { - NeonResponse *resp; - uint64 ring_index; - PrfHashEntry *entry; - PrefetchRequest *slot; - PrefetchRequest hashkey; - - Assert(PointerIsValid(request_lsns)); - Assert(nblocks >= 1); - - /* - * Use an intermediate PrefetchRequest struct as the hash key to ensure - * correct alignment and that the padding bytes are cleared. - */ - memset(&hashkey.buftag, 0, sizeof(BufferTag)); - CopyNRelFileInfoToBufTag(hashkey.buftag, rinfo); - hashkey.buftag.forkNum = forkNum; - hashkey.buftag.blockNum = base_blockno; - - /* - * The redo process does not lock pages that it needs to replay but are - * not in the shared buffers, so a concurrent process may request the page - * after redo has decided it won't redo that page and updated the LwLSN - * for that page. If we're in hot standby we need to take care that we - * don't return until after REDO has finished replaying up to that LwLSN, - * as the page should have been locked up to that point. - * - * See also the description on neon_redo_read_buffer_filter below. - * - * NOTE: It is possible that the WAL redo process will still do IO due to - * concurrent failed read IOs. Those IOs should never have a request_lsn - * that is as large as the WAL record we're currently replaying, if it - * weren't for the behaviour of the LwLsn cache that uses the highest - * value of the LwLsn cache when the entry is not found. - */ - prefetch_register_bufferv(hashkey.buftag, request_lsns, nblocks, mask, false); - for (int i = 0; i < nblocks; i++) { - void *buffer = buffers[i]; - BlockNumber blockno = base_blockno + i; - neon_request_lsns *reqlsns = &request_lsns[i]; - TimestampTz start_ts, end_ts; - - if (PointerIsValid(mask) && !BITMAP_ISSET(mask, i)) - continue; - - start_ts = GetCurrentTimestamp(); - - if (RecoveryInProgress() && MyBackendType != B_STARTUP) - XLogWaitForReplayOf(reqlsns->request_lsn); - - /* - * Try to find prefetched page in the list of received pages. - */ -Retry: - hashkey.buftag.blockNum = blockno; - entry = prfh_lookup(MyPState->prf_hash, &hashkey); - - if (entry != NULL) - { - slot = entry->slot; - if (neon_prefetch_response_usable(reqlsns, slot)) - { - ring_index = slot->my_ring_index; - } - else - { - /* - * Cannot use this prefetch, discard it - * - * We can't drop cache for not-yet-received requested items. It is - * unlikely this happens, but it can happen if prefetch distance - * is large enough and a backend didn't consume all prefetch - * requests. - */ - if (slot->status == PRFS_REQUESTED) - { - if (!prefetch_wait_for(slot->my_ring_index)) - goto Retry; - } - /* drop caches */ - prefetch_set_unused(slot->my_ring_index); - pgBufferUsage.prefetch.expired += 1; - MyNeonCounters->getpage_prefetch_discards_total++; - /* make it look like a prefetch cache miss */ - entry = NULL; - } - } - - do - { - if (entry == NULL) - { - ring_index = prefetch_register_bufferv(hashkey.buftag, reqlsns, 1, NULL, false); - Assert(ring_index != UINT64_MAX); - slot = GetPrfSlot(ring_index); - } - else - { - /* - * Empty our reference to the prefetch buffer's hash entry. When - * we wait for prefetches, the entry reference is invalidated by - * potential updates to the hash, and when we reconnect to the - * pageserver the prefetch we're waiting for may be dropped, in - * which case we need to retry and take the branch above. - */ - entry = NULL; - } - - Assert(slot->my_ring_index == ring_index); - Assert(MyPState->ring_last <= ring_index && - MyPState->ring_unused > ring_index); - Assert(slot->status != PRFS_UNUSED); - Assert(GetPrfSlot(ring_index) == slot); - - } while (!prefetch_wait_for(ring_index)); - - Assert(slot->status == PRFS_RECEIVED); - Assert(memcmp(&hashkey.buftag, &slot->buftag, sizeof(BufferTag)) == 0); - Assert(hashkey.buftag.blockNum == base_blockno + i); - - resp = slot->response; - - switch (resp->tag) - { - case T_NeonGetPageResponse: - { - NeonGetPageResponse* getpage_resp = (NeonGetPageResponse *) resp; - if (neon_protocol_version >= 3) - { - if (resp->reqid != slot->reqid || - resp->lsn != slot->request_lsns.request_lsn || - resp->not_modified_since != slot->request_lsns.not_modified_since || - !RelFileInfoEquals(getpage_resp->req.rinfo, rinfo) || - getpage_resp->req.forknum != forkNum || - getpage_resp->req.blkno != base_blockno + i) - { - NEON_PANIC_CONNECTION_STATE(-1, PANIC, - "Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u} to get page request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u}", - resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(getpage_resp->req.rinfo), getpage_resp->req.forknum, getpage_resp->req.blkno, - slot->reqid, LSN_FORMAT_ARGS(slot->request_lsns.request_lsn), LSN_FORMAT_ARGS(slot->request_lsns.not_modified_since), RelFileInfoFmt(rinfo), forkNum, base_blockno + i); - } - } - memcpy(buffer, getpage_resp->page, BLCKSZ); - lfc_write(rinfo, forkNum, blockno, buffer); - break; - } - case T_NeonErrorResponse: - if (neon_protocol_version >= 3) - { - if (resp->reqid != slot->reqid || - resp->lsn != slot->request_lsns.request_lsn || - resp->not_modified_since != slot->request_lsns.not_modified_since) - { - elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match get relsize request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}", - resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), - slot->reqid, LSN_FORMAT_ARGS(slot->request_lsns.request_lsn), LSN_FORMAT_ARGS(slot->request_lsns.not_modified_since)); - } - } - ereport(ERROR, - (errcode(ERRCODE_IO_ERROR), - errmsg(NEON_TAG "[shard %d, reqid %lx] could not read block %u in rel %u/%u/%u.%u from page server at lsn %X/%08X", - slot->shard_no, resp->reqid, blockno, RelFileInfoFmt(rinfo), - forkNum, LSN_FORMAT_ARGS(reqlsns->effective_request_lsn)), - errdetail("page server returned error: %s", - ((NeonErrorResponse *) resp)->message))); - break; - default: - NEON_PANIC_CONNECTION_STATE(slot->shard_no, PANIC, - "Expected GetPage (0x%02x) or Error (0x%02x) response to GetPageRequest, but got 0x%02x", - T_NeonGetPageResponse, T_NeonErrorResponse, resp->tag); - } - - /* buffer was used, clean up for later reuse */ - prefetch_set_unused(ring_index); - prefetch_cleanup_trailing_unused(); - - end_ts = GetCurrentTimestamp(); - inc_getpage_wait(end_ts >= start_ts ? (end_ts - start_ts) : 0); + NeonCommunicatorRequest request = { + .hdr.tag = T_NeonGetPageRequest, + .hdr.bufid = GetBufferId(buffers[i]), + .hdr.lsn = request_lsns.request_lsn, + .hdr.not_modified_since = request_lsns.not_modified_since, + .page.rinfo = InfoFromSMgrRel(reln), + .page.forknum = forknum, + .page.blocknum = blocknum + i, + }; + (void)communicator_request(get_dhard_number(InfoFromSMgrRel(reln), blocknum + i), &request); } } @@ -3201,8 +1569,6 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, 1, NULL); neon_read_at_lsn(InfoFromSMgrRel(reln), forkNum, blkno, request_lsns, buffer); - prefetch_pump_state(); - #ifdef DEBUG_COMPARE_LOCAL if (forkNum == MAIN_FORKNUM && IS_LOCAL_REL(reln)) { @@ -3283,7 +1649,6 @@ static void neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, void **buffers, BlockNumber nblocks) { - bits8 read[PG_IOV_MAX / 8]; neon_request_lsns request_lsns[PG_IOV_MAX]; int lfc_result; @@ -3340,8 +1705,6 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, neon_read_at_lsnv(InfoFromSMgrRel(reln), forknum, blocknum, request_lsns, buffers, nblocks, read); - prefetch_pump_state(); - #ifdef DEBUG_COMPARE_LOCAL if (forkNum == MAIN_FORKNUM && IS_LOCAL_REL(reln)) { @@ -3510,8 +1873,6 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const vo lfc_write(InfoFromSMgrRel(reln), forknum, blocknum, buffer); - prefetch_pump_state(); - #ifdef DEBUG_COMPARE_LOCAL if (IS_LOCAL_REL(reln)) #if PG_MAJORVERSION_NUM >= 17 @@ -3565,8 +1926,6 @@ neon_writev(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno, lfc_writev(InfoFromSMgrRel(reln), forknum, blkno, buffers, nblocks); - prefetch_pump_state(); - #ifdef DEBUG_COMPARE_LOCAL if (IS_LOCAL_REL(reln)) mdwritev(reln, forknum, blocknum, &buffer, 1, skipFsync); @@ -3581,7 +1940,6 @@ neon_writev(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno, static BlockNumber neon_nblocks(SMgrRelation reln, ForkNumber forknum) { - NeonResponse *resp; BlockNumber n_blocks; neon_request_lsns request_lsns; @@ -3614,74 +1972,16 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum) REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1, NULL); { - NeonNblocksRequest request = { + NeonCommunicatorRequest request = { .hdr.tag = T_NeonNblocksRequest, - .hdr.reqid = GENERATE_REQUEST_ID(), + .hdr.bufid = InvalidBuffer, .hdr.lsn = request_lsns.request_lsn, .hdr.not_modified_since = request_lsns.not_modified_since, - .rinfo = InfoFromSMgrRel(reln), - .forknum = forknum, + .nblocks.rinfo = InfoFromSMgrRel(reln), + .nblocks.forknum = forknum, }; - - resp = page_server_request(&request); - - switch (resp->tag) - { - case T_NeonNblocksResponse: - { - NeonNblocksResponse * relsize_resp = (NeonNblocksResponse *) resp; - if (neon_protocol_version >= 3) - { - if (!equal_requests(resp, &request.hdr) || - !RelFileInfoEquals(relsize_resp->req.rinfo, request.rinfo) || - relsize_resp->req.forknum != forknum) - { - NEON_PANIC_CONNECTION_STATE(-1, PANIC, - "Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u} to get relsize request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u}", - resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(relsize_resp->req.rinfo), relsize_resp->req.forknum, - request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since), RelFileInfoFmt(request.rinfo), forknum); - } - } - n_blocks = relsize_resp->n_blocks; - break; - } - case T_NeonErrorResponse: - if (neon_protocol_version >= 3) - { - if (!equal_requests(resp, &request.hdr)) - { - elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match get relsize request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}", - resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), - request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since)); - } - } - ereport(ERROR, - (errcode(ERRCODE_IO_ERROR), - errmsg(NEON_TAG "[reqid %lx] could not read relation size of rel %u/%u/%u.%u from page server at lsn %X/%08X", - resp->reqid, - RelFileInfoFmt(InfoFromSMgrRel(reln)), - forknum, - LSN_FORMAT_ARGS(request_lsns.effective_request_lsn)), - errdetail("page server returned error: %s", - ((NeonErrorResponse *) resp)->message))); - break; - - default: - NEON_PANIC_CONNECTION_STATE(-1, PANIC, - "Expected Nblocks (0x%02x) or Error (0x%02x) response to NblocksRequest, but got 0x%02x", - T_NeonNblocksResponse, T_NeonErrorResponse, resp->tag); - } - update_cached_relsize(InfoFromSMgrRel(reln), forknum, n_blocks); - - neon_log(SmgrTrace, "neon_nblocks: rel %u/%u/%u fork %u (request LSN %X/%08X): %u blocks", - RelFileInfoFmt(InfoFromSMgrRel(reln)), - forknum, - LSN_FORMAT_ARGS(request_lsns.effective_request_lsn), - n_blocks); - - pfree(resp); + return (BlockNumber)communicator_request(0, &request); } - return n_blocks; } /* @@ -3690,8 +1990,6 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum) int64 neon_dbsize(Oid dbNode) { - NeonResponse *resp; - int64 db_size; neon_request_lsns request_lsns; NRelFileInfo dummy_node = {0}; @@ -3699,66 +1997,15 @@ neon_dbsize(Oid dbNode) REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1, NULL); { - NeonDbSizeRequest request = { + NeonCommunicatorRequest request = { .hdr.tag = T_NeonDbSizeRequest, - .hdr.reqid = GENERATE_REQUEST_ID(), + .hdr.bufid = InvalidBuffer, .hdr.lsn = request_lsns.request_lsn, .hdr.not_modified_since = request_lsns.not_modified_since, - .dbNode = dbNode, + .dbsize.dbNode = dbNode, }; - - resp = page_server_request(&request); - - switch (resp->tag) - { - case T_NeonDbSizeResponse: - { - NeonDbSizeResponse* dbsize_resp = (NeonDbSizeResponse *) resp; - if (neon_protocol_version >= 3) - { - if (!equal_requests(resp, &request.hdr) || - dbsize_resp->req.dbNode != dbNode) - { - NEON_PANIC_CONNECTION_STATE(-1, PANIC, - "Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, dbNode=%u} to get DB size request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, dbNode=%u}", - resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), dbsize_resp->req.dbNode, - request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since), dbNode); - } - } - db_size = dbsize_resp->db_size; - break; - } - case T_NeonErrorResponse: - if (neon_protocol_version >= 3) - { - if (!equal_requests(resp, &request.hdr)) - { - elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match get DB size request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}", - resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), - request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since)); - } - } - ereport(ERROR, - (errcode(ERRCODE_IO_ERROR), - errmsg(NEON_TAG "[reqid %lx] could not read db size of db %u from page server at lsn %X/%08X", - resp->reqid, - dbNode, LSN_FORMAT_ARGS(request_lsns.effective_request_lsn)), - errdetail("page server returned error: %s", - ((NeonErrorResponse *) resp)->message))); - break; - - default: - NEON_PANIC_CONNECTION_STATE(-1, PANIC, - "Expected DbSize (0x%02x) or Error (0x%02x) response to DbSizeRequest, but got 0x%02x", - T_NeonDbSizeResponse, T_NeonErrorResponse, resp->tag); - } - - neon_log(SmgrTrace, "neon_dbsize: db %u (request LSN %X/%08X): %ld bytes", - dbNode, LSN_FORMAT_ARGS(request_lsns.effective_request_lsn), db_size); - - pfree(resp); + return communicator_request(0, &request); } - return db_size; } /* @@ -4045,119 +2292,8 @@ neon_end_unlogged_build(SMgrRelation reln) static int neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buffer) { - XLogRecPtr request_lsn, - not_modified_since; - SlruKind kind; - int n_blocks; - shardno_t shard_no = 0; /* All SLRUs are at shard 0 */ - NeonResponse *resp; - NeonGetSlruSegmentRequest request; - - /* - * Compute a request LSN to use, similar to neon_get_request_lsns() but the - * logic is a bit simpler. - */ - if (RecoveryInProgress()) - { - request_lsn = GetXLogReplayRecPtr(NULL); - if (request_lsn == InvalidXLogRecPtr) - { - /* - * This happens in neon startup, we start up without replaying any - * records. - */ - request_lsn = GetRedoStartLsn(); - } - request_lsn = nm_adjust_lsn(request_lsn); - } - else - request_lsn = UINT64_MAX; - - /* - * GetRedoStartLsn() returns LSN of the basebackup. We know that the SLRU - * segment has not changed since the basebackup, because in order to - * modify it, we would have had to download it already. And once - * downloaded, we never evict SLRU segments from local disk. - */ - not_modified_since = nm_adjust_lsn(GetRedoStartLsn()); - - if (STRPREFIX(path, "pg_xact")) - kind = SLRU_CLOG; - else if (STRPREFIX(path, "pg_multixact/members")) - kind = SLRU_MULTIXACT_MEMBERS; - else if (STRPREFIX(path, "pg_multixact/offsets")) - kind = SLRU_MULTIXACT_OFFSETS; - else - return -1; - - request = (NeonGetSlruSegmentRequest) { - .hdr.tag = T_NeonGetSlruSegmentRequest, - .hdr.reqid = GENERATE_REQUEST_ID(), - .hdr.lsn = request_lsn, - .hdr.not_modified_since = not_modified_since, - .kind = kind, - .segno = segno - }; - - do - { - while (!page_server->send(shard_no, &request.hdr) || !page_server->flush(shard_no)); - - consume_prefetch_responses(); - - resp = page_server->receive(shard_no); - } while (resp == NULL); - - switch (resp->tag) - { - case T_NeonGetSlruSegmentResponse: - { - NeonGetSlruSegmentResponse* slru_resp = (NeonGetSlruSegmentResponse *) resp; - if (neon_protocol_version >= 3) - { - if (!equal_requests(resp, &request.hdr) || - slru_resp->req.kind != kind || - slru_resp->req.segno != segno) - { - NEON_PANIC_CONNECTION_STATE(-1, PANIC, - "Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, kind=%u, segno=%u} to get SLRU segment request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, kind=%u, segno=%u}", - resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), slru_resp->req.kind, slru_resp->req.segno, - request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since), kind, segno); - } - } - n_blocks = slru_resp->n_blocks; - memcpy(buffer, slru_resp->data, n_blocks*BLCKSZ); - break; - } - case T_NeonErrorResponse: - if (neon_protocol_version >= 3) - { - if (!equal_requests(resp, &request.hdr)) - { - elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match get SLRU segment request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}", - resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), - request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since)); - } - } - ereport(ERROR, - (errcode(ERRCODE_IO_ERROR), - errmsg(NEON_TAG "[reqid %lx] could not read SLRU %d segment %d at lsn %X/%08X", - resp->reqid, - kind, - segno, - LSN_FORMAT_ARGS(request_lsn)), - errdetail("page server returned error: %s", - ((NeonErrorResponse *) resp)->message))); - break; - - default: - NEON_PANIC_CONNECTION_STATE(-1, PANIC, - "Expected GetSlruSegment (0x%02x) or Error (0x%02x) response to GetSlruSegmentRequest, but got 0x%02x", - T_NeonGetSlruSegmentResponse, T_NeonErrorResponse, resp->tag); - } - pfree(resp); - - return n_blocks; + /* TODO: support on-demand dopwnload */ + Assert(false); } static void @@ -4278,25 +2414,17 @@ neon_extend_rel_size(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, * This length is later reused when we open the smgr to read the * block, which is fine and expected. */ - NeonResponse *response; - NeonNblocksResponse *nbresponse; - NeonNblocksRequest request = { - .hdr = (NeonRequest) { - .tag = T_NeonNblocksRequest, - .reqid = GENERATE_REQUEST_ID(), - .lsn = end_recptr, - .not_modified_since = end_recptr, - }, - .rinfo = rinfo, - .forknum = forknum, + NeonCommunicatorRequest request = { + .hdr.tag = T_NeonNblocksRequest, + .hdr.bufid = InvalidBuffer, + .hdr.lsn = end_recptr, + .hdr.not_modified_since = end_recptr, + .nblocks.rinfo = InfoFromSMgrRel(reln), + .nblocks.forknum = forknum, }; + relsize = (BlockNumber)communicator_request(0, &request); - response = page_server_request(&request); - - Assert(response->tag == T_NeonNblocksResponse); - nbresponse = (NeonNblocksResponse *) response; - - relsize = Max(nbresponse->n_blocks, blkno + 1); + relsize = Max(relsize, blkno + 1); set_cached_relsize(rinfo, forknum, relsize); SetLastWrittenLSNForRelation(end_recptr, rinfo, forknum);