diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index 894c9729f2..0eb1acbfb0 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -15,6 +15,7 @@ #include "postgres.h" #include "access/xlog.h" +#include "common/hashfn.h" #include "fmgr.h" #include "libpq-fe.h" #include "libpq/libpq.h" @@ -38,17 +39,6 @@ #define MIN_RECONNECT_INTERVAL_USEC 1000 #define MAX_RECONNECT_INTERVAL_USEC 1000000 -bool connected = false; -PGconn *pageserver_conn = NULL; - -/* - * WaitEventSet containing: - * - WL_SOCKET_READABLE on pageserver_conn, - * - WL_LATCH_SET on MyLatch, and - * - WL_EXIT_ON_PM_DEATH. - */ -WaitEventSet *pageserver_conn_wes = NULL; - /* GUCs */ char *neon_timeline; char *neon_tenant; @@ -59,17 +49,25 @@ char *neon_auth_token; int readahead_buffer_size = 128; int flush_every_n_requests = 8; -static int n_reconnect_attempts = 0; -static int max_reconnect_attempts = 60; +static int n_reconnect_attempts = 0; +static int max_reconnect_attempts = 60; +static int stripe_size; -#define MAX_PAGESERVER_CONNSTRING_SIZE 256 +typedef struct +{ + char connstring[MAX_SHARDS][MAX_PAGESERVER_CONNSTRING_SIZE]; + size_t num_shards; +} ShardMap; /* + * PagestoreShmemState is kept in shared memory. It contains the connection + * strings for each shard. + * * The "neon.pageserver_connstring" GUC is marked with the PGC_SIGHUP option, * allowing it to be changed using pg_reload_conf(). The control plane can * update the connection string if the pageserver crashes, is relocated, or - * new shards are added. A copy of the current value of the GUC is kept in - * shared memory, updated by the postmaster, because regular backends don't + * new shards are added. A parsed copy of the current value of the GUC is kept + * in shared memory, updated by the postmaster, because regular backends don't * reload the config during query execution, but we might need to re-establish * the pageserver connection with the new connection string even in the middle * of a query. @@ -84,7 +82,7 @@ typedef struct { pg_atomic_uint64 begin_update_counter; pg_atomic_uint64 end_update_counter; - char pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE]; + ShardMap shard_map; } PagestoreShmemState; #if PG_VERSION_NUM >= 150000 @@ -94,10 +92,25 @@ static void walproposer_shmem_request(void); static shmem_startup_hook_type prev_shmem_startup_hook; static PagestoreShmemState *pagestore_shared; static uint64 pagestore_local_counter = 0; -static char local_pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE]; -static bool pageserver_flush(void); -static void pageserver_disconnect(void); +/* This backend's per-shard connections */ +typedef struct +{ + PGconn *conn; + + /*--- + * WaitEventSet containing: + * - WL_SOCKET_READABLE on 'conn' + * - WL_LATCH_SET on MyLatch, and + * - WL_EXIT_ON_PM_DEATH. + */ + WaitEventSet *wes; +} PageServer; + +static PageServer page_servers[MAX_SHARDS]; + +static bool pageserver_flush(shardno_t shard_no); +static void pageserver_disconnect(shardno_t shard_no); static bool PagestoreShmemIsValid(void) @@ -105,89 +118,213 @@ PagestoreShmemIsValid(void) return pagestore_shared && UsedShmemSegAddr; } +/* + * Parse a comma-separated list of connection strings into a ShardMap. + * + * If 'result' is NULL, just checks that the input is valid. If the input is + * not valid, returns false. The contents of *result are undefined in + * that case, and must not be relied on. + */ +static bool +ParseShardMap(const char *connstr, ShardMap *result) +{ + const char *p; + int nshards = 0; + + if (result) + memset(result, 0, sizeof(ShardMap)); + + p = connstr; + nshards = 0; + for (;;) + { + const char *sep; + size_t connstr_len; + + sep = strchr(p, ','); + connstr_len = sep != NULL ? sep - p : strlen(p); + + if (connstr_len == 0 && sep == NULL) + break; /* ignore trailing comma */ + + if (nshards >= MAX_SHARDS) + { + neon_log(LOG, "Too many shards"); + return false; + } + if (connstr_len >= MAX_PAGESERVER_CONNSTRING_SIZE) + { + neon_log(LOG, "Connection string too long"); + return false; + } + if (result) + { + memcpy(result->connstring[nshards], p, connstr_len); + result->connstring[nshards][connstr_len] = '\0'; + } + nshards++; + + if (sep == NULL) + break; + p = sep + 1; + } + if (result) + result->num_shards = nshards; + + return true; +} + static bool CheckPageserverConnstring(char **newval, void **extra, GucSource source) { - return strlen(*newval) < MAX_PAGESERVER_CONNSTRING_SIZE; + char *p = *newval; + + return ParseShardMap(p, NULL); } static void AssignPageserverConnstring(const char *newval, void *extra) { + ShardMap shard_map; + /* * Only postmaster updates the copy in shared memory. */ if (!PagestoreShmemIsValid() || IsUnderPostmaster) return; - pg_atomic_add_fetch_u64(&pagestore_shared->begin_update_counter, 1); - pg_write_barrier(); - strlcpy(pagestore_shared->pageserver_connstring, newval, MAX_PAGESERVER_CONNSTRING_SIZE); - pg_write_barrier(); - pg_atomic_add_fetch_u64(&pagestore_shared->end_update_counter, 1); -} - -static bool -CheckConnstringUpdated(void) -{ - if (!PagestoreShmemIsValid()) - return false; - return pagestore_local_counter < pg_atomic_read_u64(&pagestore_shared->begin_update_counter); + if (!ParseShardMap(newval, &shard_map)) + { + /* + * shouldn't happen, because we already checked the value in + * CheckPageserverConnstring + */ + elog(ERROR, "could not parse shard map"); + } + + if (memcmp(&pagestore_shared->shard_map, &shard_map, sizeof(ShardMap)) != 0) + { + pg_atomic_add_fetch_u64(&pagestore_shared->begin_update_counter, 1); + pg_write_barrier(); + memcpy(&pagestore_shared->shard_map, &shard_map, sizeof(ShardMap)); + pg_write_barrier(); + pg_atomic_add_fetch_u64(&pagestore_shared->end_update_counter, 1); + } + else + { + /* no change */ + } } +/* + * Get the current number of shards, and/or the connection string for a + * particular shard from the shard map in shared memory. + * + * If num_shards_p is not NULL, it is set to the current number of shards. + * + * If connstr_p is not NULL, the connection string for 'shard_no' is copied to + * it. It must point to a buffer at least MAX_PAGESERVER_CONNSTRING_SIZE bytes + * long. + * + * As a side-effect, if the shard map in shared memory had changed since the + * last call, terminates all existing connections to all pageservers. + */ static void -ReloadConnstring(void) +load_shard_map(shardno_t shard_no, char *connstr_p, shardno_t *num_shards_p) { uint64 begin_update_counter; uint64 end_update_counter; - - if (!PagestoreShmemIsValid()) - return; + ShardMap *shard_map = &pagestore_shared->shard_map; + shardno_t num_shards; /* - * Copy the current settnig from shared to local memory. Postmaster can - * update the value concurrently, in which case we would copy a garbled - * mix of the old and new values. We will detect it because the counter's - * won't match, and retry. But it's important that we don't do anything - * within the retry-loop that would depend on the string having valid - * contents. + * Postmaster can update the shared memory values concurrently, in which + * case we would copy a garbled mix of the old and new values. We will + * detect it because the counter's won't match, and retry. But it's + * important that we don't do anything within the retry-loop that would + * depend on the string having valid contents. */ do { begin_update_counter = pg_atomic_read_u64(&pagestore_shared->begin_update_counter); end_update_counter = pg_atomic_read_u64(&pagestore_shared->end_update_counter); - pg_read_barrier(); - strlcpy(local_pageserver_connstring, pagestore_shared->pageserver_connstring, sizeof(local_pageserver_connstring)); - pg_read_barrier(); + num_shards = shard_map->num_shards; + if (connstr_p && shard_no < MAX_SHARDS) + strlcpy(connstr_p, shard_map->connstring[shard_no], MAX_PAGESERVER_CONNSTRING_SIZE); + pg_memory_barrier(); } while (begin_update_counter != end_update_counter || begin_update_counter != pg_atomic_read_u64(&pagestore_shared->begin_update_counter) || end_update_counter != pg_atomic_read_u64(&pagestore_shared->end_update_counter)); - pagestore_local_counter = end_update_counter; + if (connstr_p && shard_no >= num_shards) + neon_log(ERROR, "Shard %d is greater or equal than number of shards %d", + shard_no, num_shards); + + /* + * If any of the connection strings changed, reset all connections. + */ + if (pagestore_local_counter != end_update_counter) + { + for (shardno_t i = 0; i < MAX_SHARDS; i++) + { + if (page_servers[i].conn) + pageserver_disconnect(i); + } + pagestore_local_counter = end_update_counter; + } + + if (num_shards_p) + *num_shards_p = num_shards; +} + +#define MB (1024*1024) + +shardno_t +get_shard_number(BufferTag *tag) +{ + shardno_t n_shards; + uint32 hash; + + load_shard_map(0, NULL, &n_shards); + +#if PG_MAJORVERSION_NUM < 16 + hash = murmurhash32(tag->rnode.relNode); + hash = hash_combine(hash, murmurhash32(tag->blockNum / stripe_size)); +#else + hash = murmurhash32(tag->relNumber); + hash = hash_combine(hash, murmurhash32(tag->blockNum / stripe_size)); +#endif + + return hash % n_shards; } static bool -pageserver_connect(int elevel) +pageserver_connect(shardno_t shard_no, int elevel) { char *query; int ret; const char *keywords[3]; const char *values[3]; int n; + PGconn *conn; + WaitEventSet *wes; + char connstr[MAX_PAGESERVER_CONNSTRING_SIZE]; static TimestampTz last_connect_time = 0; static uint64_t delay_us = MIN_RECONNECT_INTERVAL_USEC; TimestampTz now; uint64_t us_since_last_connect; - Assert(!connected); + Assert(page_servers[shard_no].conn == NULL); - if (CheckConnstringUpdated()) - { - ReloadConnstring(); - } + /* + * Get the connection string for this shard. If the shard map has been + * updated since we last looked, this will also disconnect any existing + * pageserver connections as a side effect. + */ + load_shard_map(shard_no, connstr, NULL); now = GetCurrentTimestamp(); us_since_last_connect = now - last_connect_time; @@ -223,76 +360,84 @@ pageserver_connect(int elevel) n++; } keywords[n] = "dbname"; - values[n] = local_pageserver_connstring; + values[n] = connstr; n++; keywords[n] = NULL; values[n] = NULL; n++; - pageserver_conn = PQconnectdbParams(keywords, values, 1); + conn = PQconnectdbParams(keywords, values, 1); - if (PQstatus(pageserver_conn) == CONNECTION_BAD) + if (PQstatus(conn) == CONNECTION_BAD) { - char *msg = pchomp(PQerrorMessage(pageserver_conn)); + char *msg = pchomp(PQerrorMessage(conn)); - PQfinish(pageserver_conn); - pageserver_conn = NULL; + PQfinish(conn); ereport(elevel, (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), - errmsg(NEON_TAG "could not establish connection to pageserver"), + errmsg(NEON_TAG "[shard %d] could not establish connection to pageserver", shard_no), errdetail_internal("%s", msg))); + pfree(msg); return false; } - query = psprintf("pagestream %s %s", neon_tenant, neon_timeline); - ret = PQsendQuery(pageserver_conn, query); + ret = PQsendQuery(conn, query); + pfree(query); if (ret != 1) { - PQfinish(pageserver_conn); - pageserver_conn = NULL; - neon_log(elevel, "could not send pagestream command to pageserver"); + PQfinish(conn); + neon_shard_log(shard_no, elevel, "could not send pagestream command to pageserver"); return false; } - pageserver_conn_wes = CreateWaitEventSet(TopMemoryContext, 3); - AddWaitEventToSet(pageserver_conn_wes, WL_LATCH_SET, PGINVALID_SOCKET, + wes = CreateWaitEventSet(TopMemoryContext, 3); + AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL); - AddWaitEventToSet(pageserver_conn_wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, + AddWaitEventToSet(wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, NULL, NULL); - AddWaitEventToSet(pageserver_conn_wes, WL_SOCKET_READABLE, PQsocket(pageserver_conn), NULL, NULL); + AddWaitEventToSet(wes, WL_SOCKET_READABLE, PQsocket(conn), NULL, NULL); - while (PQisBusy(pageserver_conn)) + PG_TRY(); { - WaitEvent event; - - /* Sleep until there's something to do */ - (void) WaitEventSetWait(pageserver_conn_wes, -1L, &event, 1, PG_WAIT_EXTENSION); - ResetLatch(MyLatch); - - CHECK_FOR_INTERRUPTS(); - - /* Data available in socket? */ - if (event.events & WL_SOCKET_READABLE) + while (PQisBusy(conn)) { - if (!PQconsumeInput(pageserver_conn)) + WaitEvent event; + + /* Sleep until there's something to do */ + (void) WaitEventSetWait(wes, -1L, &event, 1, PG_WAIT_EXTENSION); + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + + /* Data available in socket? */ + if (event.events & WL_SOCKET_READABLE) { - char *msg = pchomp(PQerrorMessage(pageserver_conn)); + if (!PQconsumeInput(conn)) + { + char *msg = pchomp(PQerrorMessage(conn)); - PQfinish(pageserver_conn); - pageserver_conn = NULL; - FreeWaitEventSet(pageserver_conn_wes); - pageserver_conn_wes = NULL; + PQfinish(conn); + FreeWaitEventSet(wes); - neon_log(elevel, "could not complete handshake with pageserver: %s", - msg); - return false; + neon_shard_log(shard_no, elevel, "could not complete handshake with pageserver: %s", + msg); + return false; + } } } } + PG_CATCH(); + { + PQfinish(conn); + FreeWaitEventSet(wes); + PG_RE_THROW(); + } + PG_END_TRY(); - neon_log(LOG, "libpagestore: connected to '%s'", page_server_connstring); + neon_shard_log(shard_no, LOG, "libpagestore: connected to '%s'", connstr); + page_servers[shard_no].conn = conn; + page_servers[shard_no].wes = wes; - connected = true; return true; } @@ -300,9 +445,10 @@ pageserver_connect(int elevel) * A wrapper around PQgetCopyData that checks for interrupts while sleeping. */ static int -call_PQgetCopyData(char **buffer) +call_PQgetCopyData(shardno_t shard_no, char **buffer) { int ret; + PGconn *pageserver_conn = page_servers[shard_no].conn; retry: ret = PQgetCopyData(pageserver_conn, buffer, 1 /* async */ ); @@ -312,7 +458,7 @@ retry: WaitEvent event; /* Sleep until there's something to do */ - (void) WaitEventSetWait(pageserver_conn_wes, -1L, &event, 1, PG_WAIT_EXTENSION); + (void) WaitEventSetWait(page_servers[shard_no].wes, -1L, &event, 1, PG_WAIT_EXTENSION); ResetLatch(MyLatch); CHECK_FOR_INTERRUPTS(); @@ -324,7 +470,7 @@ retry: { char *msg = pchomp(PQerrorMessage(pageserver_conn)); - neon_log(LOG, "could not get response from pageserver: %s", msg); + neon_shard_log(shard_no, LOG, "could not get response from pageserver: %s", msg); pfree(msg); return -1; } @@ -338,7 +484,7 @@ retry: static void -pageserver_disconnect(void) +pageserver_disconnect(shardno_t shard_no) { /* * If anything goes wrong while we were sending a request, it's not clear @@ -347,38 +493,38 @@ pageserver_disconnect(void) * time later after we have already sent a new unrelated request. Close * the connection to avoid getting confused. */ - if (connected) + if (page_servers[shard_no].conn) { - neon_log(LOG, "dropping connection to page server due to error"); - PQfinish(pageserver_conn); - pageserver_conn = NULL; - connected = false; + neon_shard_log(shard_no, LOG, "dropping connection to page server due to error"); + PQfinish(page_servers[shard_no].conn); + page_servers[shard_no].conn = NULL; + /* + * If the connection to any pageserver is lost, we throw away the + * whole prefetch queue, even for other pageservers. It should not + * cause big problems, because connection loss is supposed to be a + * rare event. + */ prefetch_on_ps_disconnect(); } - if (pageserver_conn_wes != NULL) + if (page_servers[shard_no].wes != NULL) { - FreeWaitEventSet(pageserver_conn_wes); - pageserver_conn_wes = NULL; + FreeWaitEventSet(page_servers[shard_no].wes); + page_servers[shard_no].wes = NULL; } } static bool -pageserver_send(NeonRequest *request) +pageserver_send(shardno_t shard_no, NeonRequest *request) { StringInfoData req_buff; - - if (CheckConnstringUpdated()) - { - pageserver_disconnect(); - ReloadConnstring(); - } + PGconn *pageserver_conn = page_servers[shard_no].conn; /* If the connection was lost for some reason, reconnect */ - if (connected && PQstatus(pageserver_conn) == CONNECTION_BAD) + if (pageserver_conn && PQstatus(pageserver_conn) == CONNECTION_BAD) { - neon_log(LOG, "pageserver_send disconnect bad connection"); - pageserver_disconnect(); + neon_shard_log(shard_no, LOG, "pageserver_send disconnect bad connection"); + pageserver_disconnect(shard_no); } req_buff = nm_pack_request(request); @@ -392,9 +538,9 @@ pageserver_send(NeonRequest *request) * https://github.com/neondatabase/neon/issues/1138 So try to reestablish * connection in case of failure. */ - if (!connected) + if (!page_servers[shard_no].conn) { - while (!pageserver_connect(n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR)) + while (!pageserver_connect(shard_no, n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR)) { HandleMainLoopInterrupts(); n_reconnect_attempts += 1; @@ -402,6 +548,8 @@ pageserver_send(NeonRequest *request) n_reconnect_attempts = 0; } + pageserver_conn = page_servers[shard_no].conn; + /* * Send request. * @@ -414,8 +562,8 @@ pageserver_send(NeonRequest *request) { char *msg = pchomp(PQerrorMessage(pageserver_conn)); - pageserver_disconnect(); - neon_log(LOG, "pageserver_send disconnect because failed to send page request (try to reconnect): %s", msg); + pageserver_disconnect(shard_no); + neon_shard_log(shard_no, LOG, "pageserver_send disconnect because failed to send page request (try to reconnect): %s", msg); pfree(msg); pfree(req_buff.data); return false; @@ -427,19 +575,20 @@ pageserver_send(NeonRequest *request) { char *msg = nm_to_string((NeonMessage *) request); - neon_log(PageStoreTrace, "sent request: %s", msg); + neon_shard_log(shard_no, PageStoreTrace, "sent request: %s", msg); pfree(msg); } return true; } static NeonResponse * -pageserver_receive(void) +pageserver_receive(shardno_t shard_no) { StringInfoData resp_buff; NeonResponse *resp; + PGconn *pageserver_conn = page_servers[shard_no].conn; - if (!connected) + if (!pageserver_conn) return NULL; PG_TRY(); @@ -447,7 +596,7 @@ pageserver_receive(void) /* read response */ int rc; - rc = call_PQgetCopyData(&resp_buff.data); + rc = call_PQgetCopyData(shard_no, &resp_buff.data); if (rc >= 0) { resp_buff.len = rc; @@ -459,33 +608,33 @@ pageserver_receive(void) { char *msg = nm_to_string((NeonMessage *) resp); - neon_log(PageStoreTrace, "got response: %s", msg); + neon_shard_log(shard_no, PageStoreTrace, "got response: %s", msg); pfree(msg); } } else if (rc == -1) { - neon_log(LOG, "pageserver_receive disconnect because call_PQgetCopyData returns -1: %s", pchomp(PQerrorMessage(pageserver_conn))); - pageserver_disconnect(); + neon_shard_log(shard_no, LOG, "pageserver_receive disconnect because call_PQgetCopyData returns -1: %s", pchomp(PQerrorMessage(pageserver_conn))); + pageserver_disconnect(shard_no); resp = NULL; } else if (rc == -2) { char *msg = pchomp(PQerrorMessage(pageserver_conn)); - pageserver_disconnect(); - neon_log(ERROR, "pageserver_receive disconnect because could not read COPY data: %s", msg); + pageserver_disconnect(shard_no); + neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect because could not read COPY data: %s", msg); } else { - pageserver_disconnect(); - neon_log(ERROR, "pageserver_receive disconnect because unexpected PQgetCopyData return value: %d", rc); + pageserver_disconnect(shard_no); + neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect because unexpected PQgetCopyData return value: %d", rc); } } PG_CATCH(); { - neon_log(LOG, "pageserver_receive disconnect due to caught exception"); - pageserver_disconnect(); + neon_shard_log(shard_no, LOG, "pageserver_receive disconnect due to caught exception"); + pageserver_disconnect(shard_no); PG_RE_THROW(); } PG_END_TRY(); @@ -495,11 +644,13 @@ pageserver_receive(void) static bool -pageserver_flush(void) +pageserver_flush(shardno_t shard_no) { - if (!connected) + PGconn *pageserver_conn = page_servers[shard_no].conn; + + if (!pageserver_conn) { - neon_log(WARNING, "Tried to flush while disconnected"); + neon_shard_log(shard_no, WARNING, "Tried to flush while disconnected"); } else { @@ -507,8 +658,8 @@ pageserver_flush(void) { char *msg = pchomp(PQerrorMessage(pageserver_conn)); - pageserver_disconnect(); - neon_log(LOG, "pageserver_flush disconnect because failed to flush page requests: %s", msg); + pageserver_disconnect(shard_no); + neon_shard_log(shard_no, LOG, "pageserver_flush disconnect because failed to flush page requests: %s", msg); pfree(msg); return false; } @@ -550,6 +701,7 @@ PagestoreShmemInit(void) { pg_atomic_init_u64(&pagestore_shared->begin_update_counter, 0); pg_atomic_init_u64(&pagestore_shared->end_update_counter, 0); + memset(&pagestore_shared->shard_map, 0, sizeof(ShardMap)); AssignPageserverConnstring(page_server_connstring, NULL); } LWLockRelease(AddinShmemInitLock); @@ -624,6 +776,15 @@ pg_init_libpagestore(void) 0, /* no flags required */ check_neon_id, NULL, NULL); + DefineCustomIntVariable("neon.stripe_size", + "sharding stripe size", + NULL, + &stripe_size, + 32768, 1, INT_MAX, + PGC_SIGHUP, + GUC_UNIT_BLOCKS, + NULL, NULL, NULL); + DefineCustomIntVariable("neon.max_cluster_size", "cluster size limit", NULL, diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index 3fcaab0bee..8c02f357bc 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -20,9 +20,13 @@ #include "lib/stringinfo.h" #include "libpq/pqformat.h" #include "storage/block.h" +#include "storage/buf_internals.h" #include "storage/smgr.h" #include "utils/memutils.h" +#define MAX_SHARDS 128 +#define MAX_PAGESERVER_CONNSTRING_SIZE 256 + typedef enum { /* pagestore_client -> pagestore */ @@ -51,6 +55,9 @@ typedef struct #define neon_log(tag, fmt, ...) ereport(tag, \ (errmsg(NEON_TAG fmt, ##__VA_ARGS__), \ errhidestmt(true), errhidecontext(true), errposition(0), internalerrposition(0))) +#define neon_shard_log(shard_no, tag, fmt, ...) ereport(tag, \ + (errmsg(NEON_TAG "[shard %d] " fmt, shard_no, ##__VA_ARGS__), \ + errhidestmt(true), errhidecontext(true), errposition(0), internalerrposition(0))) /* * supertype of all the Neon*Request structs below @@ -141,11 +148,13 @@ extern char *nm_to_string(NeonMessage *msg); * API */ +typedef unsigned shardno_t; + typedef struct { - bool (*send) (NeonRequest *request); - NeonResponse *(*receive) (void); - bool (*flush) (void); + bool (*send) (shardno_t shard_no, NeonRequest * request); + NeonResponse *(*receive) (shardno_t shard_no); + bool (*flush) (shardno_t shard_no); } page_server_api; extern void prefetch_on_ps_disconnect(void); @@ -159,6 +168,8 @@ extern char *neon_timeline; extern char *neon_tenant; extern int32 max_cluster_size; +extern shardno_t get_shard_number(BufferTag* tag); + extern const f_smgr *smgr_neon(BackendId backend, NRelFileInfo rinfo); extern void smgr_init_neon(void); extern void readahead_buffer_resize(int newsize, void *extra); diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 0db093e5a7..1fa802e6f4 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -172,6 +172,7 @@ typedef struct PrefetchRequest XLogRecPtr actual_request_lsn; NeonResponse *response; /* may be null */ PrefetchStatus status; + shardno_t shard_no; uint64 my_ring_index; } PrefetchRequest; @@ -239,10 +240,17 @@ typedef struct PrefetchState * also unused */ /* the buffers */ - prfh_hash *prf_hash; + prfh_hash *prf_hash; + int max_shard_no; + /* Mark shards involved in prefetch */ + uint8 shard_bitmap[(MAX_SHARDS + 7)/8]; PrefetchRequest prf_buffer[]; /* prefetch buffers */ } PrefetchState; +#define BITMAP_ISSET(bm, bit) ((bm)[(bit) >> 3] & (1 << ((bit) & 7))) +#define BITMAP_SET(bm, bit) (bm)[(bit) >> 3] |= (1 << ((bit) & 7)) +#define BITMAP_CLR(bm, bit) (bm)[(bit) >> 3] &= ~(1 << ((bit) & 7)) + static PrefetchState *MyPState; #define GetPrfSlot(ring_index) ( \ @@ -327,6 +335,7 @@ compact_prefetch_buffers(void) Assert(target_slot->status == PRFS_UNUSED); target_slot->buftag = source_slot->buftag; + target_slot->shard_no = source_slot->shard_no; target_slot->status = source_slot->status; target_slot->response = source_slot->response; target_slot->effective_request_lsn = source_slot->effective_request_lsn; @@ -494,6 +503,23 @@ prefetch_cleanup_trailing_unused(void) } } + +static bool +prefetch_flush_requests(void) +{ + for (shardno_t shard_no = 0; shard_no < MyPState->max_shard_no; shard_no++) + { + if (BITMAP_ISSET(MyPState->shard_bitmap, shard_no)) + { + if (!page_server->flush(shard_no)) + return false; + BITMAP_CLR(MyPState->shard_bitmap, shard_no); + } + } + MyPState->max_shard_no = 0; + return true; +} + /* * Wait for slot of ring_index to have received its response. * The caller is responsible for making sure the request buffer is flushed. @@ -509,7 +535,7 @@ prefetch_wait_for(uint64 ring_index) if (MyPState->ring_flush <= ring_index && MyPState->ring_unused > MyPState->ring_flush) { - if (!page_server->flush()) + if (!prefetch_flush_requests()) return false; MyPState->ring_flush = MyPState->ring_unused; } @@ -547,7 +573,7 @@ prefetch_read(PrefetchRequest *slot) Assert(slot->my_ring_index == MyPState->ring_receive); old = MemoryContextSwitchTo(MyPState->errctx); - response = (NeonResponse *) page_server->receive(); + response = (NeonResponse *) page_server->receive(slot->shard_no); MemoryContextSwitchTo(old); if (response) { @@ -704,12 +730,14 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force Assert(slot->response == NULL); Assert(slot->my_ring_index == MyPState->ring_unused); - while (!page_server->send((NeonRequest *) &request)); + while (!page_server->send(slot->shard_no, (NeonRequest *) &request)); /* update prefetch state */ MyPState->n_requests_inflight += 1; MyPState->n_unused -= 1; MyPState->ring_unused += 1; + BITMAP_SET(MyPState->shard_bitmap, slot->shard_no); + MyPState->max_shard_no = Max(slot->shard_no+1, MyPState->max_shard_no); /* update slot state */ slot->status = PRFS_REQUESTED; @@ -880,6 +908,7 @@ Retry: * function reads the buffer tag from the slot. */ slot->buftag = tag; + slot->shard_no = get_shard_number(&tag); slot->my_ring_index = ring_index; prefetch_do_request(slot, force_latest, force_lsn); @@ -890,7 +919,7 @@ Retry: if (flush_every_n_requests > 0 && MyPState->ring_unused - MyPState->ring_flush >= flush_every_n_requests) { - if (!page_server->flush()) + if (!prefetch_flush_requests()) { /* * Prefetch set is reset in case of error, so we should try to @@ -908,13 +937,44 @@ static NeonResponse * page_server_request(void const *req) { NeonResponse *resp; + BufferTag tag = {0}; + shardno_t shard_no; + + switch (((NeonRequest *) req)->tag) + { + case T_NeonExistsRequest: + CopyNRelFileInfoToBufTag(tag, ((NeonExistsRequest *) req)->rinfo); + break; + case T_NeonNblocksRequest: + CopyNRelFileInfoToBufTag(tag, ((NeonNblocksRequest *) req)->rinfo); + break; + case T_NeonDbSizeRequest: + NInfoGetDbOid(BufTagGetNRelFileInfo(tag)) = ((NeonDbSizeRequest *) req)->dbNode; + break; + case T_NeonGetPageRequest: + CopyNRelFileInfoToBufTag(tag, ((NeonGetPageRequest *) req)->rinfo); + tag.blockNum = ((NeonGetPageRequest *) req)->blkno; + break; + default: + neon_log(ERROR, "Unexpected request tag: %d", ((NeonRequest *) req)->tag); + } + shard_no = get_shard_number(&tag); + + + /* + * Current sharding model assumes that all metadata is present only at shard 0. + * We still need to call get_shard_no() to check if shard map is up-to-date. + */ + if (((NeonRequest *) req)->tag != T_NeonGetPageRequest || ((NeonGetPageRequest *) req)->forknum != MAIN_FORKNUM) + { + shard_no = 0; + } do { - while (!page_server->send((NeonRequest *) req) || !page_server->flush()); - MyPState->ring_flush = MyPState->ring_unused; + while (!page_server->send(shard_no, (NeonRequest *) req) || !page_server->flush(shard_no)); consume_prefetch_responses(); - resp = page_server->receive(); + resp = page_server->receive(shard_no); } while (resp == NULL); return resp; @@ -2098,8 +2158,8 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, case T_NeonErrorResponse: ereport(ERROR, (errcode(ERRCODE_IO_ERROR), - errmsg(NEON_TAG "could not read block %u in rel %u/%u/%u.%u from page server at lsn %X/%08X", - blkno, + errmsg(NEON_TAG "[shard %d] could not read block %u in rel %u/%u/%u.%u from page server at lsn %X/%08X", + slot->shard_no, blkno, RelFileInfoFmt(rinfo), forkNum, (uint32) (request_lsn >> 32), (uint32) request_lsn),