diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index 6725ce8fff..21db666caa 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -308,13 +308,13 @@ lfc_change_limit_hook(int newval, void *extra) Assert(victim->access_count == 0); #ifdef FALLOC_FL_PUNCH_HOLE if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, (off_t) victim->offset * BLOCKS_PER_CHUNK * BLCKSZ, BLOCKS_PER_CHUNK * BLCKSZ) < 0) - elog(LOG, "Failed to punch hole in file: %m"); + neon_log(LOG, "Failed to punch hole in file: %m"); #endif hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL); lfc_ctl->used -= 1; } lfc_ctl->limit = new_size; - elog(DEBUG1, "set local file cache limit to %d", new_size); + neon_log(DEBUG1, "set local file cache limit to %d", new_size); LWLockRelease(lfc_lock); } @@ -327,7 +327,7 @@ lfc_init(void) * shared_preload_libraries. */ if (!process_shared_preload_libraries_in_progress) - elog(ERROR, "Neon module should be loaded via shared_preload_libraries"); + neon_log(ERROR, "Neon module should be loaded via shared_preload_libraries"); DefineCustomIntVariable("neon.max_file_cache_size", @@ -643,7 +643,7 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, const void Assert(victim->access_count == 0); entry->offset = victim->offset; /* grab victim's chunk */ hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL); - elog(DEBUG2, "Swap file cache page"); + neon_log(DEBUG2, "Swap file cache page"); } else { @@ -846,10 +846,10 @@ local_cache_pages(PG_FUNCTION_ARGS) * wrong) function definition though. */ if (get_call_result_type(fcinfo, NULL, &expected_tupledesc) != TYPEFUNC_COMPOSITE) - elog(ERROR, "return type must be a row type"); + neon_log(ERROR, "return type must be a row type"); if (expected_tupledesc->natts != NUM_LOCALCACHE_PAGES_ELEM) - elog(ERROR, "incorrect number of output arguments"); + neon_log(ERROR, "incorrect number of output arguments"); /* Construct a tuple descriptor for the result rows. */ tupledesc = CreateTemplateTupleDesc(expected_tupledesc->natts); diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index e58c28d7d5..d79e42b873 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -15,6 +15,7 @@ #include "postgres.h" #include "access/xlog.h" +#include "common/hashfn.h" #include "fmgr.h" #include "libpq-fe.h" #include "libpq/libpq.h" @@ -38,17 +39,6 @@ #define MIN_RECONNECT_INTERVAL_USEC 100 #define MAX_RECONNECT_INTERVAL_USEC 1000000 -bool connected = false; -PGconn *pageserver_conn = NULL; - -/* - * WaitEventSet containing: - * - WL_SOCKET_READABLE on pageserver_conn, - * - WL_LATCH_SET on MyLatch, and - * - WL_EXIT_ON_PM_DEATH. - */ -WaitEventSet *pageserver_conn_wes = NULL; - /* GUCs */ char *neon_timeline; char *neon_tenant; @@ -59,92 +49,203 @@ char *neon_auth_token; int readahead_buffer_size = 128; int flush_every_n_requests = 8; -static int n_reconnect_attempts = 0; -static int max_reconnect_attempts = 60; +static int n_reconnect_attempts = 0; +static int max_reconnect_attempts = 60; +static int stripe_size; -#define MAX_PAGESERVER_CONNSTRING_SIZE 256 +bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL; + +static bool pageserver_flush(shardno_t shard_no); +static void pageserver_disconnect(shardno_t shard_no); +static void AssignPageserverConnstring(const char *newval, void *extra); +static bool CheckPageserverConnstring(char **newval, void **extra, GucSource source); + +static shmem_startup_hook_type prev_shmem_startup_hook; +#if PG_VERSION_NUM>=150000 +static shmem_request_hook_type prev_shmem_request_hook; +#endif + +/* + * ShardMap is kept in shared memory. It contains the connection strings for + * each shard. + * + * There is "neon.pageserver_connstring" GUC with PGC_SIGHUP option, allowing to change it using + * pg_reload_conf(). It is used by control plane to update shards information if page server is crashed, + * relocated or new shards are added. This GUC variable contains comma separated list of connection strings. + * It is copied to shared memory because config can not be loaded during query execution and we need to + * reestablish connection to page server. + * + * Copying connection string to shared memory is done by postmaster. And other backends + * should check update counter to determine of connection URL is changed and connection needs to be reestablished. + * We can not use standard Postgres LW-locks, because postmaster has proc entry and so can not wait + * on this primitive. This is why lockless access algorithm is implemented using two atomic counters to enforce + * consistent reading of connection string value from shared memory. + */ +typedef struct +{ + size_t n_shards; + pg_atomic_uint64 begin_update_counter; + pg_atomic_uint64 end_update_counter; + char shard_connstr[MAX_SHARDS][MAX_PS_CONNSTR_LEN]; +} ShardMap; + + +static ShardMap* shard_map; +static uint64 shard_map_update_counter; typedef struct { - LWLockId lock; - pg_atomic_uint64 update_counter; - char pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE]; -} PagestoreShmemState; + /* + * Connection for each shard + */ + PGconn *conn; + /* + * WaitEventSet containing: + * - WL_SOCKET_READABLE on 'conn' + * - WL_LATCH_SET on MyLatch, and + * - WL_EXIT_ON_PM_DEATH. + */ + WaitEventSet *wes; +} PageServer; -#if PG_VERSION_NUM >= 150000 -static shmem_request_hook_type prev_shmem_request_hook = NULL; -static void walproposer_shmem_request(void); +static PageServer page_servers[MAX_SHARDS]; + +static void +psm_shmem_startup(void) +{ + bool found; + if (prev_shmem_startup_hook) + { + prev_shmem_startup_hook(); + } + + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); + + shard_map = (ShardMap*)ShmemInitStruct("shard_map", sizeof(ShardMap), &found); + if (!found) + { + shard_map->n_shards = 0; + pg_atomic_init_u64(&shard_map->begin_update_counter, 0); + pg_atomic_init_u64(&shard_map->end_update_counter, 0); + AssignPageserverConnstring(page_server_connstring, NULL); + } + LWLockRelease(AddinShmemInitLock); +} + +static void +psm_shmem_request(void) +{ +#if PG_VERSION_NUM>=150000 + if (prev_shmem_request_hook) + prev_shmem_request_hook(); #endif -static shmem_startup_hook_type prev_shmem_startup_hook; -static PagestoreShmemState *pagestore_shared; -static uint64 pagestore_local_counter = 0; -static char local_pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE]; -static bool pageserver_flush(void); -static void pageserver_disconnect(void); - -static bool -PagestoreShmemIsValid() -{ - return pagestore_shared && UsedShmemSegAddr; -} - -static bool -CheckPageserverConnstring(char **newval, void **extra, GucSource source) -{ - return strlen(*newval) < MAX_PAGESERVER_CONNSTRING_SIZE; + RequestAddinShmemSpace(sizeof(ShardMap)); } static void -AssignPageserverConnstring(const char *newval, void *extra) +psm_init(void) { - if (!PagestoreShmemIsValid()) - return; - LWLockAcquire(pagestore_shared->lock, LW_EXCLUSIVE); - strlcpy(pagestore_shared->pageserver_connstring, newval, MAX_PAGESERVER_CONNSTRING_SIZE); - pg_atomic_fetch_add_u64(&pagestore_shared->update_counter, 1); - LWLockRelease(pagestore_shared->lock); + prev_shmem_startup_hook = shmem_startup_hook; + shmem_startup_hook = psm_shmem_startup; +#if PG_VERSION_NUM>=150000 + prev_shmem_request_hook = shmem_request_hook; + shmem_request_hook = psm_shmem_request; +#else + psm_shmem_request(); +#endif +} + +/* + * Reload shard map if needed and return number of shards and connection string for the specified shard + * 'connstr' is an output buffer. If not NULL, it must point to a buffer at least MAX_PS_CONNSTR_LEN bytes + * long. The connection string for the gven shard is copied to it. + */ +static shardno_t +load_shard_map(shardno_t shard_no, char* connstr) +{ + shardno_t n_shards; + uint64 begin_update_counter; + uint64 end_update_counter; + + /* + * There is race condition here between backend and postmaster which can update shard map. + * We recheck update counter after copying shard map to check that configuration was not changed. + */ + do + { + begin_update_counter = pg_atomic_read_u64(&shard_map->begin_update_counter); + end_update_counter = pg_atomic_read_u64(&shard_map->end_update_counter); + + n_shards = shard_map->n_shards; + if (shard_no >= n_shards) + neon_log(ERROR, "Shard %d is greater or equal than number of shards %d", shard_no, n_shards); + + if (connstr) + { + strlcpy(connstr, shard_map->shard_connstr[shard_no], MAX_PS_CONNSTR_LEN); + pg_memory_barrier(); + } + + } + while (begin_update_counter != end_update_counter + || begin_update_counter != pg_atomic_read_u64(&shard_map->begin_update_counter) + || end_update_counter != pg_atomic_read_u64(&shard_map->end_update_counter)); + + + if (shard_map_update_counter != end_update_counter) + { + /* Reset all connections if connection strings are changed */ + for (shardno_t i = 0; i < MAX_SHARDS; i++) + { + if (page_servers[i].conn) + pageserver_disconnect(i); + } + shard_map_update_counter = end_update_counter; + } + + return n_shards; +} + +#define MB (1024*1024) + +shardno_t +get_shard_number(BufferTag* tag) +{ + shardno_t n_shards = load_shard_map(0, NULL); + uint32 hash; + +#if PG_MAJORVERSION_NUM < 16 + hash = murmurhash32(tag->rnode.relNode); + hash = hash_combine(hash, murmurhash32(tag->blockNum/stripe_size)); +#else + hash = murmurhash32(tag->relNumber); + hash = hash_combine(hash, murmurhash32(tag->blockNum/stripe_size)); +#endif + + return hash % n_shards; } static bool -CheckConnstringUpdated() -{ - if (!PagestoreShmemIsValid()) - return false; - return pagestore_local_counter < pg_atomic_read_u64(&pagestore_shared->update_counter); -} - -static void -ReloadConnstring() -{ - if (!PagestoreShmemIsValid()) - return; - LWLockAcquire(pagestore_shared->lock, LW_SHARED); - strlcpy(local_pageserver_connstring, pagestore_shared->pageserver_connstring, sizeof(local_pageserver_connstring)); - pagestore_local_counter = pg_atomic_read_u64(&pagestore_shared->update_counter); - LWLockRelease(pagestore_shared->lock); -} - -static bool -pageserver_connect(int elevel) +pageserver_connect(shardno_t shard_no, int elevel) { char *query; int ret; const char *keywords[3]; const char *values[3]; int n; + PGconn* conn; + WaitEventSet *wes; + char connstr[MAX_PS_CONNSTR_LEN]; static TimestampTz last_connect_time = 0; static uint64_t delay_us = MIN_RECONNECT_INTERVAL_USEC; TimestampTz now; uint64_t us_since_last_connect; - Assert(!connected); + Assert(page_servers[shard_no].conn == NULL); - if (CheckConnstringUpdated()) - { - ReloadConnstring(); - } + (void)load_shard_map(shard_no, connstr); /* refresh shard map if needed */ now = GetCurrentTimestamp(); us_since_last_connect = now - last_connect_time; @@ -180,76 +281,84 @@ pageserver_connect(int elevel) n++; } keywords[n] = "dbname"; - values[n] = local_pageserver_connstring; + values[n] = connstr; n++; keywords[n] = NULL; values[n] = NULL; n++; - pageserver_conn = PQconnectdbParams(keywords, values, 1); + conn = PQconnectdbParams(keywords, values, 1); - if (PQstatus(pageserver_conn) == CONNECTION_BAD) + if (PQstatus(conn) == CONNECTION_BAD) { - char *msg = pchomp(PQerrorMessage(pageserver_conn)); + char *msg = pchomp(PQerrorMessage(conn)); - PQfinish(pageserver_conn); - pageserver_conn = NULL; + PQfinish(conn); ereport(elevel, (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), - errmsg(NEON_TAG "could not establish connection to pageserver"), + errmsg(NEON_TAG "[shard %d] could not establish connection to pageserver", shard_no), errdetail_internal("%s", msg))); + pfree(msg); return false; } - query = psprintf("pagestream %s %s", neon_tenant, neon_timeline); - ret = PQsendQuery(pageserver_conn, query); + ret = PQsendQuery(conn, query); + pfree(query); if (ret != 1) { - PQfinish(pageserver_conn); - pageserver_conn = NULL; - neon_log(elevel, "could not send pagestream command to pageserver"); + PQfinish(conn); + neon_shard_log(shard_no, elevel, "could not send pagestream command to pageserver"); return false; } - pageserver_conn_wes = CreateWaitEventSet(TopMemoryContext, 3); - AddWaitEventToSet(pageserver_conn_wes, WL_LATCH_SET, PGINVALID_SOCKET, + wes = CreateWaitEventSet(TopMemoryContext, 3); + AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL); - AddWaitEventToSet(pageserver_conn_wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, + AddWaitEventToSet(wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, NULL, NULL); - AddWaitEventToSet(pageserver_conn_wes, WL_SOCKET_READABLE, PQsocket(pageserver_conn), NULL, NULL); + AddWaitEventToSet(wes, WL_SOCKET_READABLE, PQsocket(conn), NULL, NULL); - while (PQisBusy(pageserver_conn)) + PG_TRY(); { - WaitEvent event; - - /* Sleep until there's something to do */ - (void) WaitEventSetWait(pageserver_conn_wes, -1L, &event, 1, PG_WAIT_EXTENSION); - ResetLatch(MyLatch); - - CHECK_FOR_INTERRUPTS(); - - /* Data available in socket? */ - if (event.events & WL_SOCKET_READABLE) + while (PQisBusy(conn)) { - if (!PQconsumeInput(pageserver_conn)) + WaitEvent event; + + /* Sleep until there's something to do */ + (void) WaitEventSetWait(wes, -1L, &event, 1, PG_WAIT_EXTENSION); + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + + /* Data available in socket? */ + if (event.events & WL_SOCKET_READABLE) { - char *msg = pchomp(PQerrorMessage(pageserver_conn)); + if (!PQconsumeInput(conn)) + { + char *msg = pchomp(PQerrorMessage(conn)); - PQfinish(pageserver_conn); - pageserver_conn = NULL; - FreeWaitEventSet(pageserver_conn_wes); - pageserver_conn_wes = NULL; + PQfinish(conn); + FreeWaitEventSet(wes); - neon_log(elevel, "could not complete handshake with pageserver: %s", - msg); - return false; + neon_shard_log(shard_no, elevel, "could not complete handshake with pageserver: %s", + msg); + return false; + } } } } + PG_CATCH(); + { + PQfinish(conn); + FreeWaitEventSet(wes); + PG_RE_THROW(); + } + PG_END_TRY(); - neon_log(LOG, "libpagestore: connected to '%s'", page_server_connstring); + neon_shard_log(shard_no, LOG, "libpagestore: connected to '%s'", connstr); + page_servers[shard_no].conn = conn; + page_servers[shard_no].wes = wes; - connected = true; return true; } @@ -257,10 +366,10 @@ pageserver_connect(int elevel) * A wrapper around PQgetCopyData that checks for interrupts while sleeping. */ static int -call_PQgetCopyData(char **buffer) +call_PQgetCopyData(shardno_t shard_no, char **buffer) { int ret; - + PGconn* pageserver_conn = page_servers[shard_no].conn; retry: ret = PQgetCopyData(pageserver_conn, buffer, 1 /* async */ ); @@ -269,7 +378,7 @@ retry: WaitEvent event; /* Sleep until there's something to do */ - (void) WaitEventSetWait(pageserver_conn_wes, -1L, &event, 1, PG_WAIT_EXTENSION); + (void) WaitEventSetWait(page_servers[shard_no].wes, -1L, &event, 1, PG_WAIT_EXTENSION); ResetLatch(MyLatch); CHECK_FOR_INTERRUPTS(); @@ -281,7 +390,7 @@ retry: { char *msg = pchomp(PQerrorMessage(pageserver_conn)); - neon_log(LOG, "could not get response from pageserver: %s", msg); + neon_shard_log(shard_no, LOG, "could not get response from pageserver: %s", msg); pfree(msg); return -1; } @@ -295,7 +404,7 @@ retry: static void -pageserver_disconnect(void) +pageserver_disconnect(shardno_t shard_no) { /* * If anything goes wrong while we were sending a request, it's not clear @@ -304,38 +413,36 @@ pageserver_disconnect(void) * time later after we have already sent a new unrelated request. Close * the connection to avoid getting confused. */ - if (connected) + if (page_servers[shard_no].conn) { - neon_log(LOG, "dropping connection to page server due to error"); - PQfinish(pageserver_conn); - pageserver_conn = NULL; - connected = false; + neon_shard_log(shard_no, LOG, "dropping connection to page server due to error"); + PQfinish(page_servers[shard_no].conn); + page_servers[shard_no].conn = NULL; + /* + * If the connection to any pageserver is lost, we throw away the whole prefetch queue, even for other pageservers. + * It should not cause big problems, because connection loss is supposed to be a rare event. + */ prefetch_on_ps_disconnect(); } - if (pageserver_conn_wes != NULL) + if (page_servers[shard_no].wes != NULL) { - FreeWaitEventSet(pageserver_conn_wes); - pageserver_conn_wes = NULL; + FreeWaitEventSet(page_servers[shard_no].wes); + page_servers[shard_no].wes = NULL; } } static bool -pageserver_send(NeonRequest *request) +pageserver_send(shardno_t shard_no, NeonRequest *request) { StringInfoData req_buff; - - if (CheckConnstringUpdated()) - { - pageserver_disconnect(); - ReloadConnstring(); - } + PGconn* pageserver_conn = page_servers[shard_no].conn; /* If the connection was lost for some reason, reconnect */ - if (connected && PQstatus(pageserver_conn) == CONNECTION_BAD) + if (pageserver_conn && PQstatus(pageserver_conn) == CONNECTION_BAD) { - neon_log(LOG, "pageserver_send disconnect bad connection"); - pageserver_disconnect(); + neon_shard_log(shard_no, LOG, "pageserver_send disconnect bad connection"); + pageserver_disconnect(shard_no); } req_buff = nm_pack_request(request); @@ -349,9 +456,9 @@ pageserver_send(NeonRequest *request) * https://github.com/neondatabase/neon/issues/1138 So try to reestablish * connection in case of failure. */ - if (!connected) + if (!page_servers[shard_no].conn) { - while (!pageserver_connect(n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR)) + while (!pageserver_connect(shard_no, n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR)) { HandleMainLoopInterrupts(); n_reconnect_attempts += 1; @@ -359,7 +466,9 @@ pageserver_send(NeonRequest *request) n_reconnect_attempts = 0; } - /* + pageserver_conn = page_servers[shard_no].conn; + + /* * Send request. * * In principle, this could block if the output buffer is full, and we @@ -370,9 +479,8 @@ pageserver_send(NeonRequest *request) if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0) { char *msg = pchomp(PQerrorMessage(pageserver_conn)); - - pageserver_disconnect(); - neon_log(LOG, "pageserver_send disconnect because failed to send page request (try to reconnect): %s", msg); + pageserver_disconnect(shard_no); + neon_shard_log(shard_no, LOG, "pageserver_send disconnect because failed to send page request (try to reconnect): %s", msg); pfree(msg); pfree(req_buff.data); return false; @@ -384,19 +492,19 @@ pageserver_send(NeonRequest *request) { char *msg = nm_to_string((NeonMessage *) request); - neon_log(PageStoreTrace, "sent request: %s", msg); + neon_shard_log(shard_no, PageStoreTrace, "sent request: %s", msg); pfree(msg); } return true; } static NeonResponse * -pageserver_receive(void) +pageserver_receive(shardno_t shard_no) { StringInfoData resp_buff; NeonResponse *resp; - - if (!connected) + PGconn* pageserver_conn = page_servers[shard_no].conn; + if (!pageserver_conn) return NULL; PG_TRY(); @@ -404,7 +512,7 @@ pageserver_receive(void) /* read response */ int rc; - rc = call_PQgetCopyData(&resp_buff.data); + rc = call_PQgetCopyData(shard_no, &resp_buff.data); if (rc >= 0) { resp_buff.len = rc; @@ -416,33 +524,33 @@ pageserver_receive(void) { char *msg = nm_to_string((NeonMessage *) resp); - neon_log(PageStoreTrace, "got response: %s", msg); + neon_shard_log(shard_no, PageStoreTrace, "got response: %s", msg); pfree(msg); } } else if (rc == -1) { - neon_log(LOG, "pageserver_receive disconnect because call_PQgetCopyData returns -1: %s", pchomp(PQerrorMessage(pageserver_conn))); - pageserver_disconnect(); + neon_shard_log(shard_no, LOG, "pageserver_receive disconnect because call_PQgetCopyData returns -1: %s", pchomp(PQerrorMessage(pageserver_conn))); + pageserver_disconnect(shard_no); resp = NULL; } else if (rc == -2) { char *msg = pchomp(PQerrorMessage(pageserver_conn)); - pageserver_disconnect(); - neon_log(ERROR, "pageserver_receive disconnect because could not read COPY data: %s", msg); + pageserver_disconnect(shard_no); + neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect because could not read COPY data: %s", msg); } else { - pageserver_disconnect(); - neon_log(ERROR, "pageserver_receive disconnect because unexpected PQgetCopyData return value: %d", rc); + pageserver_disconnect(shard_no); + neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect because unexpected PQgetCopyData return value: %d", rc); } } PG_CATCH(); { - neon_log(LOG, "pageserver_receive disconnect due to caught exception"); - pageserver_disconnect(); + neon_shard_log(shard_no, LOG, "pageserver_receive disconnect due to caught exception"); + pageserver_disconnect(shard_no); PG_RE_THROW(); } PG_END_TRY(); @@ -452,11 +560,12 @@ pageserver_receive(void) static bool -pageserver_flush(void) +pageserver_flush(shardno_t shard_no) { - if (!connected) + PGconn* pageserver_conn = page_servers[shard_no].conn; + if (!pageserver_conn) { - neon_log(WARNING, "Tried to flush while disconnected"); + neon_shard_log(shard_no, WARNING, "Tried to flush while disconnected"); } else { @@ -464,8 +573,8 @@ pageserver_flush(void) { char *msg = pchomp(PQerrorMessage(pageserver_conn)); - pageserver_disconnect(); - neon_log(LOG, "pageserver_flush disconnect because failed to flush page requests: %s", msg); + pageserver_disconnect(shard_no); + neon_shard_log(shard_no, LOG, "pageserver_flush disconnect because failed to flush page requests: %s", msg); pfree(msg); return false; } @@ -488,63 +597,82 @@ check_neon_id(char **newval, void **extra, GucSource source) return **newval == '\0' || HexDecodeString(id, *newval, 16); } -static Size -PagestoreShmemSize(void) -{ - return sizeof(PagestoreShmemState); -} - static bool -PagestoreShmemInit(void) +CheckPageserverConnstring(char **newval, void **extra, GucSource source) { - bool found; - - LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); - pagestore_shared = ShmemInitStruct("libpagestore shared state", - PagestoreShmemSize(), - &found); - if (!found) + const char* shard_connstr = *newval; + const char* sep; + size_t connstr_len; + int i = 0; + do { - pagestore_shared->lock = &(GetNamedLWLockTranche("neon_libpagestore")->lock); - pg_atomic_init_u64(&pagestore_shared->update_counter, 0); - AssignPageserverConnstring(page_server_connstring, NULL); + sep = strchr(shard_connstr, ','); + connstr_len = sep != NULL ? sep - shard_connstr : strlen(shard_connstr); + if (connstr_len == 0 && sep == NULL) + break; /* trailing comma */ + if (i >= MAX_SHARDS) + { + neon_log(LOG, "Too many shards"); + return false; + } + if (connstr_len >= MAX_PS_CONNSTR_LEN) + { + neon_log(LOG, "Connection string too long"); + return false; + } + shard_connstr = sep + 1; + i += 1; + } while (sep != NULL); + + return true; +} + +static void +AssignPageserverConnstring(const char *newval, void *extra) +{ + /* + * Load shard map only at Postmaster. + * If old page server is not available, then backends can be blocked in attempts to reconnect to it and do not reload config in this loop + * + * Copying GUC value to shared memory is usually performed by postmaster. + */ + if (shard_map != NULL && UsedShmemSegAddr != NULL && MyProcPid == PostmasterPid) + { + const char* shard_connstr = newval; + const char* sep; + size_t connstr_len; + int i = 0; + bool shard_map_changed = false; + do + { + sep = strchr(shard_connstr, ','); + connstr_len = sep != NULL ? sep - shard_connstr : strlen(shard_connstr); + if (connstr_len == 0) + break; /* trailing comma */ + Assert(i < MAX_SHARDS); + Assert(connstr_len < MAX_PS_CONNSTR_LEN); + if (i >= shard_map->n_shards || + strcmp(shard_map->shard_connstr[i], shard_connstr) != 0) + { + if (!shard_map_changed) + { + pg_atomic_add_fetch_u64(&shard_map->begin_update_counter, 1); + shard_map_changed = true; + } + memcpy(shard_map->shard_connstr[i], shard_connstr, connstr_len); + shard_map->shard_connstr[i][connstr_len] = '\0'; + } + shard_connstr = sep + 1; + i += 1; + } while (sep != NULL); + + if (shard_map_changed) + { + shard_map->n_shards = i; + pg_memory_barrier(); + pg_atomic_add_fetch_u64(&shard_map->end_update_counter, 1); + } } - LWLockRelease(AddinShmemInitLock); - return found; -} - -static void -pagestore_shmem_startup_hook(void) -{ - if (prev_shmem_startup_hook) - prev_shmem_startup_hook(); - - PagestoreShmemInit(); -} - -static void -pagestore_shmem_request(void) -{ -#if PG_VERSION_NUM >= 150000 - if (prev_shmem_request_hook) - prev_shmem_request_hook(); -#endif - - RequestAddinShmemSpace(PagestoreShmemSize()); - RequestNamedLWLockTranche("neon_libpagestore", 1); -} - -static void -pagestore_prepare_shmem(void) -{ -#if PG_VERSION_NUM >= 150000 - prev_shmem_request_hook = shmem_request_hook; - shmem_request_hook = pagestore_shmem_request; -#else - pagestore_shmem_request(); -#endif - prev_shmem_startup_hook = shmem_startup_hook; - shmem_startup_hook = pagestore_shmem_startup_hook; } /* @@ -553,8 +681,6 @@ pagestore_prepare_shmem(void) void pg_init_libpagestore(void) { - pagestore_prepare_shmem(); - DefineCustomStringVariable("neon.pageserver_connstring", "connection string to the page server", NULL, @@ -582,6 +708,15 @@ pg_init_libpagestore(void) 0, /* no flags required */ check_neon_id, NULL, NULL); + DefineCustomIntVariable("neon.stripe_size", + "sharding stripe size", + NULL, + &stripe_size, + 32768, 1, INT_MAX, + PGC_SIGHUP, + GUC_UNIT_BLOCKS, + NULL, NULL, NULL); + DefineCustomIntVariable("neon.max_cluster_size", "cluster size limit", NULL, @@ -645,4 +780,5 @@ pg_init_libpagestore(void) } lfc_init(); + psm_init(); } diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index 3fcaab0bee..64dc3dd7c8 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -17,12 +17,20 @@ #include "access/xlogdefs.h" #include RELFILEINFO_HDR +#include "storage/block.h" +#include "storage/smgr.h" +#include "storage/buf_internals.h" #include "lib/stringinfo.h" #include "libpq/pqformat.h" #include "storage/block.h" #include "storage/smgr.h" #include "utils/memutils.h" +#include "pg_config.h" + +#define MAX_SHARDS 128 +#define MAX_PS_CONNSTR_LEN 128 + typedef enum { /* pagestore_client -> pagestore */ @@ -51,6 +59,9 @@ typedef struct #define neon_log(tag, fmt, ...) ereport(tag, \ (errmsg(NEON_TAG fmt, ##__VA_ARGS__), \ errhidestmt(true), errhidecontext(true), errposition(0), internalerrposition(0))) +#define neon_shard_log(shard_no, tag, fmt, ...) ereport(tag, \ + (errmsg(NEON_TAG "[shard %d] " fmt, shard_no, ##__VA_ARGS__), \ + errhidestmt(true), errhidecontext(true), errposition(0), internalerrposition(0))) /* * supertype of all the Neon*Request structs below @@ -141,11 +152,13 @@ extern char *nm_to_string(NeonMessage *msg); * API */ +typedef unsigned shardno_t; + typedef struct { - bool (*send) (NeonRequest *request); - NeonResponse *(*receive) (void); - bool (*flush) (void); + bool (*send) (shardno_t shard_no, NeonRequest * request); + NeonResponse *(*receive) (shardno_t shard_no); + bool (*flush) (shardno_t shard_no); } page_server_api; extern void prefetch_on_ps_disconnect(void); @@ -159,6 +172,8 @@ extern char *neon_timeline; extern char *neon_tenant; extern int32 max_cluster_size; +extern shardno_t get_shard_number(BufferTag* tag); + extern const f_smgr *smgr_neon(BackendId backend, NRelFileInfo rinfo); extern void smgr_init_neon(void); extern void readahead_buffer_resize(int newsize, void *extra); diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 8888cd89c6..a2cc3ffe03 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -172,6 +172,7 @@ typedef struct PrefetchRequest XLogRecPtr actual_request_lsn; NeonResponse *response; /* may be null */ PrefetchStatus status; + shardno_t shard_no; uint64 my_ring_index; } PrefetchRequest; @@ -239,7 +240,9 @@ typedef struct PrefetchState * also unused */ /* the buffers */ - prfh_hash *prf_hash; + prfh_hash *prf_hash; + int max_shard_no; + uint8 shard_bitmap[(MAX_SHARDS + 7)/8]; PrefetchRequest prf_buffer[]; /* prefetch buffers */ } PrefetchState; @@ -327,6 +330,7 @@ compact_prefetch_buffers(void) Assert(target_slot->status == PRFS_UNUSED); target_slot->buftag = source_slot->buftag; + target_slot->shard_no = source_slot->shard_no; target_slot->status = source_slot->status; target_slot->response = source_slot->response; target_slot->effective_request_lsn = source_slot->effective_request_lsn; @@ -494,6 +498,23 @@ prefetch_cleanup_trailing_unused(void) } } + +static bool +prefetch_flush_requests(void) +{ + for (shardno_t shard_no = 0; shard_no < MyPState->max_shard_no; shard_no++) + { + if (MyPState->shard_bitmap[shard_no >> 3] & (1 << (shard_no & 7))) + { + if (!page_server->flush(shard_no)) + return false; + MyPState->shard_bitmap[shard_no >> 3] &= ~(1 << (shard_no & 7)); + } + } + MyPState->max_shard_no = 0; + return true; +} + /* * Wait for slot of ring_index to have received its response. * The caller is responsible for making sure the request buffer is flushed. @@ -509,7 +530,7 @@ prefetch_wait_for(uint64 ring_index) if (MyPState->ring_flush <= ring_index && MyPState->ring_unused > MyPState->ring_flush) { - if (!page_server->flush()) + if (!prefetch_flush_requests()) return false; MyPState->ring_flush = MyPState->ring_unused; } @@ -547,7 +568,7 @@ prefetch_read(PrefetchRequest *slot) Assert(slot->my_ring_index == MyPState->ring_receive); old = MemoryContextSwitchTo(MyPState->errctx); - response = (NeonResponse *) page_server->receive(); + response = (NeonResponse *) page_server->receive(slot->shard_no); MemoryContextSwitchTo(old); if (response) { @@ -704,12 +725,14 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force Assert(slot->response == NULL); Assert(slot->my_ring_index == MyPState->ring_unused); - while (!page_server->send((NeonRequest *) &request)); + while (!page_server->send(slot->shard_no, (NeonRequest *) &request)); /* update prefetch state */ MyPState->n_requests_inflight += 1; MyPState->n_unused -= 1; MyPState->ring_unused += 1; + MyPState->shard_bitmap[slot->shard_no >> 3] |= 1 << (slot->shard_no & 7); + MyPState->max_shard_no = Max(slot->shard_no+1, MyPState->max_shard_no); /* update slot state */ slot->status = PRFS_REQUESTED; @@ -880,6 +903,7 @@ Retry: * function reads the buffer tag from the slot. */ slot->buftag = tag; + slot->shard_no = get_shard_number(&tag); slot->my_ring_index = ring_index; prefetch_do_request(slot, force_latest, force_lsn); @@ -890,7 +914,7 @@ Retry: if (flush_every_n_requests > 0 && MyPState->ring_unused - MyPState->ring_flush >= flush_every_n_requests) { - if (!page_server->flush()) + if (!prefetch_flush_requests()) { /* * Prefetch set is reset in case of error, so we should try to @@ -908,13 +932,44 @@ static NeonResponse * page_server_request(void const *req) { NeonResponse *resp; + BufferTag tag = {0}; + shardno_t shard_no; + + switch (((NeonRequest *) req)->tag) + { + case T_NeonExistsRequest: + CopyNRelFileInfoToBufTag(tag, ((NeonExistsRequest *) req)->rinfo); + break; + case T_NeonNblocksRequest: + CopyNRelFileInfoToBufTag(tag, ((NeonNblocksRequest *) req)->rinfo); + break; + case T_NeonDbSizeRequest: + NInfoGetDbOid(BufTagGetNRelFileInfo(tag)) = ((NeonDbSizeRequest *) req)->dbNode; + break; + case T_NeonGetPageRequest: + CopyNRelFileInfoToBufTag(tag, ((NeonGetPageRequest *) req)->rinfo); + tag.blockNum = ((NeonGetPageRequest *) req)->blkno; + break; + default: + neon_log(ERROR, "Unexpected request tag: %d", ((NeonRequest *) req)->tag); + } + shard_no = get_shard_number(&tag); + + + /* + * Current sharding model assumes that all metadata is present only at shard 0. + * We still need to call get_shard_no() to check if shard map is up-to-date. + */ + if (((NeonRequest *) req)->tag != T_NeonGetPageRequest || ((NeonGetPageRequest *) req)->forknum != MAIN_FORKNUM) + { + shard_no = 0; + } do { - while (!page_server->send((NeonRequest *) req) || !page_server->flush()); - MyPState->ring_flush = MyPState->ring_unused; + while (!page_server->send(shard_no, (NeonRequest *) req) || !page_server->flush(shard_no)); consume_prefetch_responses(); - resp = page_server->receive(); + resp = page_server->receive(shard_no); } while (resp == NULL); return resp; @@ -990,7 +1045,7 @@ nm_pack_request(NeonRequest *msg) case T_NeonErrorResponse: case T_NeonDbSizeResponse: default: - elog(ERROR, "unexpected neon message tag 0x%02x", msg->tag); + neon_log(ERROR, "unexpected neon message tag 0x%02x", msg->tag); break; } return s; @@ -1085,7 +1140,7 @@ nm_unpack_response(StringInfo s) case T_NeonGetPageRequest: case T_NeonDbSizeRequest: default: - elog(ERROR, "unexpected neon message tag 0x%02x", tag); + neon_log(ERROR, "unexpected neon message tag 0x%02x", tag); break; } @@ -1277,7 +1332,7 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, co XLogFlush(recptr); lsn = recptr; ereport(SmgrTrace, - (errmsg("Page %u of relation %u/%u/%u.%u was force logged. Evicted at lsn=%X/%X", + (errmsg(NEON_TAG "Page %u of relation %u/%u/%u.%u was force logged. Evicted at lsn=%X/%X", blocknum, RelFileInfoFmt(InfoFromSMgrRel(reln)), forknum, LSN_FORMAT_ARGS(lsn)))); @@ -1305,7 +1360,7 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, co if (PageIsNew((Page) buffer)) { ereport(SmgrTrace, - (errmsg("Page %u of relation %u/%u/%u.%u is all-zeros", + (errmsg(NEON_TAG "Page %u of relation %u/%u/%u.%u is all-zeros", blocknum, RelFileInfoFmt(InfoFromSMgrRel(reln)), forknum))); @@ -1313,7 +1368,7 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, co else if (PageIsEmptyHeapPage((Page) buffer)) { ereport(SmgrTrace, - (errmsg("Page %u of relation %u/%u/%u.%u is an empty heap page with no LSN", + (errmsg(NEON_TAG "Page %u of relation %u/%u/%u.%u is an empty heap page with no LSN", blocknum, RelFileInfoFmt(InfoFromSMgrRel(reln)), forknum))); @@ -1321,7 +1376,7 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, co else { ereport(PANIC, - (errmsg("Page %u of relation %u/%u/%u.%u is evicted with zero LSN", + (errmsg(NEON_TAG "Page %u of relation %u/%u/%u.%u is evicted with zero LSN", blocknum, RelFileInfoFmt(InfoFromSMgrRel(reln)), forknum))); @@ -1330,7 +1385,7 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, co else { ereport(SmgrTrace, - (errmsg("Page %u of relation %u/%u/%u.%u is already wal logged at lsn=%X/%X", + (errmsg(NEON_TAG "Page %u of relation %u/%u/%u.%u is already wal logged at lsn=%X/%X", blocknum, RelFileInfoFmt(InfoFromSMgrRel(reln)), forknum, LSN_FORMAT_ARGS(lsn)))); @@ -1430,7 +1485,7 @@ neon_get_request_lsn(bool *latest, NRelFileInfo rinfo, ForkNumber forknum, Block lsn = GetLastWrittenLSN(rinfo, forknum, blkno); lsn = nm_adjust_lsn(lsn); - elog(DEBUG1, "neon_get_request_lsn GetXLogReplayRecPtr %X/%X request lsn 0 ", + neon_log(DEBUG1, "neon_get_request_lsn GetXLogReplayRecPtr %X/%X request lsn 0 ", (uint32) ((lsn) >> 32), (uint32) (lsn)); } else @@ -1445,7 +1500,7 @@ neon_get_request_lsn(bool *latest, NRelFileInfo rinfo, ForkNumber forknum, Block *latest = true; lsn = GetLastWrittenLSN(rinfo, forknum, blkno); Assert(lsn != InvalidXLogRecPtr); - elog(DEBUG1, "neon_get_request_lsn GetLastWrittenLSN lsn %X/%X ", + neon_log(DEBUG1, "neon_get_request_lsn GetLastWrittenLSN lsn %X/%X ", (uint32) ((lsn) >> 32), (uint32) (lsn)); lsn = nm_adjust_lsn(lsn); @@ -1465,7 +1520,7 @@ neon_get_request_lsn(bool *latest, NRelFileInfo rinfo, ForkNumber forknum, Block #endif if (lsn > flushlsn) { - elog(DEBUG5, "last-written LSN %X/%X is ahead of last flushed LSN %X/%X", + neon_log(DEBUG5, "last-written LSN %X/%X is ahead of last flushed LSN %X/%X", (uint32) (lsn >> 32), (uint32) lsn, (uint32) (flushlsn >> 32), (uint32) flushlsn); XLogFlush(lsn); @@ -1509,7 +1564,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum) return mdexists(reln, forkNum); default: - elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); + neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); } if (get_cached_relsize(InfoFromSMgrRel(reln), forkNum, &n_blocks)) @@ -1561,7 +1616,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum) case T_NeonErrorResponse: ereport(ERROR, (errcode(ERRCODE_IO_ERROR), - errmsg("could not read relation existence of rel %u/%u/%u.%u from page server at lsn %X/%08X", + errmsg(NEON_TAG "could not read relation existence of rel %u/%u/%u.%u from page server at lsn %X/%08X", RelFileInfoFmt(InfoFromSMgrRel(reln)), forkNum, (uint32) (request_lsn >> 32), (uint32) request_lsn), @@ -1570,7 +1625,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum) break; default: - elog(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag); + neon_log(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag); } pfree(resp); return exists; @@ -1587,7 +1642,7 @@ neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo) switch (reln->smgr_relpersistence) { case 0: - elog(ERROR, "cannot call smgrcreate() on rel with unknown persistence"); + neon_log(ERROR, "cannot call smgrcreate() on rel with unknown persistence"); case RELPERSISTENCE_PERMANENT: break; @@ -1598,10 +1653,10 @@ neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo) return; default: - elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); + neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); } - elog(SmgrTrace, "Create relation %u/%u/%u.%u", + neon_log(SmgrTrace, "Create relation %u/%u/%u.%u", RelFileInfoFmt(InfoFromSMgrRel(reln)), forkNum); @@ -1696,7 +1751,7 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, switch (reln->smgr_relpersistence) { case 0: - elog(ERROR, "cannot call smgrextend() on rel with unknown persistence"); + neon_log(ERROR, "cannot call smgrextend() on rel with unknown persistence"); case RELPERSISTENCE_PERMANENT: break; @@ -1707,7 +1762,7 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, return; default: - elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); + neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); } /* @@ -1726,7 +1781,7 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, if (current_size >= ((uint64) max_cluster_size) * 1024 * 1024) ereport(ERROR, (errcode(ERRCODE_DISK_FULL), - errmsg("could not extend file because project size limit (%d MB) has been exceeded", + errmsg(NEON_TAG "could not extend file because project size limit (%d MB) has been exceeded", max_cluster_size), errhint("This limit is defined externally by the project size limit, and internally by neon.max_cluster_size GUC"))); } @@ -1745,7 +1800,7 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, set_cached_relsize(InfoFromSMgrRel(reln), forkNum, blkno + 1); lsn = PageGetLSN((Page) buffer); - elog(SmgrTrace, "smgrextend called for %u/%u/%u.%u blk %u, page LSN: %X/%08X", + neon_log(SmgrTrace, "smgrextend called for %u/%u/%u.%u blk %u, page LSN: %X/%08X", RelFileInfoFmt(InfoFromSMgrRel(reln)), forkNum, blkno, (uint32) (lsn >> 32), (uint32) lsn); @@ -1785,7 +1840,7 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blocknum, switch (reln->smgr_relpersistence) { case 0: - elog(ERROR, "cannot call smgrextend() on rel with unknown persistence"); + neon_log(ERROR, "cannot call smgrextend() on rel with unknown persistence"); case RELPERSISTENCE_PERMANENT: break; @@ -1796,7 +1851,7 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blocknum, return; default: - elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); + neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); } if (max_cluster_size > 0 && @@ -1808,7 +1863,7 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blocknum, if (current_size >= ((uint64) max_cluster_size) * 1024 * 1024) ereport(ERROR, (errcode(ERRCODE_DISK_FULL), - errmsg("could not extend file because cluster size limit (%d MB) has been exceeded", + errmsg(NEON_TAG "could not extend file because cluster size limit (%d MB) has been exceeded", max_cluster_size), errhint("This limit is defined by neon.max_cluster_size GUC"))); } @@ -1821,7 +1876,7 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blocknum, if ((uint64) blocknum + nblocks >= (uint64) InvalidBlockNumber) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), - errmsg("cannot extend file \"%s\" beyond %u blocks", + errmsg(NEON_TAG "cannot extend file \"%s\" beyond %u blocks", relpath(reln->smgr_rlocator, forkNum), InvalidBlockNumber))); @@ -1882,7 +1937,7 @@ neon_open(SMgrRelation reln) mdopen(reln); /* no work */ - elog(SmgrTrace, "[NEON_SMGR] open noop"); + neon_log(SmgrTrace, "open noop"); } /* @@ -1919,7 +1974,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum) return mdprefetch(reln, forknum, blocknum); default: - elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); + neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); } if (lfc_cache_contains(InfoFromSMgrRel(reln), forknum, blocknum)) @@ -1964,11 +2019,11 @@ neon_writeback(SMgrRelation reln, ForkNumber forknum, return; default: - elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); + neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); } /* not implemented */ - elog(SmgrTrace, "[NEON_SMGR] writeback noop"); + neon_log(SmgrTrace, "writeback noop"); #ifdef DEBUG_COMPARE_LOCAL if (IS_LOCAL_REL(reln)) @@ -2098,8 +2153,8 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, case T_NeonErrorResponse: ereport(ERROR, (errcode(ERRCODE_IO_ERROR), - errmsg("could not read block %u in rel %u/%u/%u.%u from page server at lsn %X/%08X", - blkno, + errmsg(NEON_TAG "[shard %d] could not read block %u in rel %u/%u/%u.%u from page server at lsn %X/%08X", + slot->shard_no, blkno, RelFileInfoFmt(rinfo), forkNum, (uint32) (request_lsn >> 32), (uint32) request_lsn), @@ -2107,7 +2162,7 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, ((NeonErrorResponse *) resp)->message))); break; default: - elog(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag); + neon_log(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag); } /* buffer was used, clean up for later reuse */ @@ -2131,7 +2186,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer switch (reln->smgr_relpersistence) { case 0: - elog(ERROR, "cannot call smgrread() on rel with unknown persistence"); + neon_log(ERROR, "cannot call smgrread() on rel with unknown persistence"); case RELPERSISTENCE_PERMANENT: break; @@ -2142,7 +2197,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer return; default: - elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); + neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); } /* Try to read from local file cache */ @@ -2170,7 +2225,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer { if (!PageIsNew((Page) pageserver_masked)) { - elog(PANIC, "page is new in MD but not in Page Server at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n%s\n", + neon_log(PANIC, "page is new in MD but not in Page Server at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n%s\n", blkno, RelFileInfoFmt(InfoFromSMgrRel(reln)), forkNum, @@ -2180,7 +2235,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer } else if (PageIsNew((Page) buffer)) { - elog(PANIC, "page is new in Page Server but not in MD at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n%s\n", + neon_log(PANIC, "page is new in Page Server but not in MD at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n%s\n", blkno, RelFileInfoFmt(InfoFromSMgrRel(reln)), forkNum, @@ -2195,7 +2250,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer if (memcmp(mdbuf_masked, pageserver_masked, BLCKSZ) != 0) { - elog(PANIC, "heap buffers differ at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n------ MD ------\n%s\n------ Page Server ------\n%s\n", + neon_log(PANIC, "heap buffers differ at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n------ MD ------\n%s\n------ Page Server ------\n%s\n", blkno, RelFileInfoFmt(InfoFromSMgrRel(reln)), forkNum, @@ -2214,7 +2269,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer if (memcmp(mdbuf_masked, pageserver_masked, BLCKSZ) != 0) { - elog(PANIC, "btree buffers differ at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n------ MD ------\n%s\n------ Page Server ------\n%s\n", + neon_log(PANIC, "btree buffers differ at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n------ MD ------\n%s\n------ Page Server ------\n%s\n", blkno, RelFileInfoFmt(InfoFromSMgrRel(reln)), forkNum, @@ -2294,13 +2349,13 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const vo return; default: - elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); + neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); } neon_wallog_page(reln, forknum, blocknum, buffer, false); lsn = PageGetLSN((Page) buffer); - elog(SmgrTrace, "smgrwrite called for %u/%u/%u.%u blk %u, page LSN: %X/%08X", + neon_log(SmgrTrace, "smgrwrite called for %u/%u/%u.%u blk %u, page LSN: %X/%08X", RelFileInfoFmt(InfoFromSMgrRel(reln)), forknum, blocknum, (uint32) (lsn >> 32), (uint32) lsn); @@ -2327,7 +2382,7 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum) switch (reln->smgr_relpersistence) { case 0: - elog(ERROR, "cannot call smgrnblocks() on rel with unknown persistence"); + neon_log(ERROR, "cannot call smgrnblocks() on rel with unknown persistence"); break; case RELPERSISTENCE_PERMANENT: @@ -2338,12 +2393,12 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum) return mdnblocks(reln, forknum); default: - elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); + neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); } if (get_cached_relsize(InfoFromSMgrRel(reln), forknum, &n_blocks)) { - elog(SmgrTrace, "cached nblocks for %u/%u/%u.%u: %u blocks", + neon_log(SmgrTrace, "cached nblocks for %u/%u/%u.%u: %u blocks", RelFileInfoFmt(InfoFromSMgrRel(reln)), forknum, n_blocks); return n_blocks; @@ -2371,7 +2426,7 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum) case T_NeonErrorResponse: ereport(ERROR, (errcode(ERRCODE_IO_ERROR), - errmsg("could not read relation size of rel %u/%u/%u.%u from page server at lsn %X/%08X", + errmsg(NEON_TAG "could not read relation size of rel %u/%u/%u.%u from page server at lsn %X/%08X", RelFileInfoFmt(InfoFromSMgrRel(reln)), forknum, (uint32) (request_lsn >> 32), (uint32) request_lsn), @@ -2380,11 +2435,11 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum) break; default: - elog(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag); + neon_log(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag); } update_cached_relsize(InfoFromSMgrRel(reln), forknum, n_blocks); - elog(SmgrTrace, "neon_nblocks: rel %u/%u/%u fork %u (request LSN %X/%08X): %u blocks", + neon_log(SmgrTrace, "neon_nblocks: rel %u/%u/%u fork %u (request LSN %X/%08X): %u blocks", RelFileInfoFmt(InfoFromSMgrRel(reln)), forknum, (uint32) (request_lsn >> 32), (uint32) request_lsn, @@ -2427,7 +2482,7 @@ neon_dbsize(Oid dbNode) case T_NeonErrorResponse: ereport(ERROR, (errcode(ERRCODE_IO_ERROR), - errmsg("could not read db size of db %u from page server at lsn %X/%08X", + errmsg(NEON_TAG "could not read db size of db %u from page server at lsn %X/%08X", dbNode, (uint32) (request_lsn >> 32), (uint32) request_lsn), errdetail("page server returned error: %s", @@ -2435,10 +2490,10 @@ neon_dbsize(Oid dbNode) break; default: - elog(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag); + neon_log(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag); } - elog(SmgrTrace, "neon_dbsize: db %u (request LSN %X/%08X): %ld bytes", + neon_log(SmgrTrace, "neon_dbsize: db %u (request LSN %X/%08X): %ld bytes", dbNode, (uint32) (request_lsn >> 32), (uint32) request_lsn, db_size); @@ -2458,7 +2513,7 @@ neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks) switch (reln->smgr_relpersistence) { case 0: - elog(ERROR, "cannot call smgrtruncate() on rel with unknown persistence"); + neon_log(ERROR, "cannot call smgrtruncate() on rel with unknown persistence"); break; case RELPERSISTENCE_PERMANENT: @@ -2470,7 +2525,7 @@ neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks) return; default: - elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); + neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); } set_cached_relsize(InfoFromSMgrRel(reln), forknum, nblocks); @@ -2526,7 +2581,7 @@ neon_immedsync(SMgrRelation reln, ForkNumber forknum) switch (reln->smgr_relpersistence) { case 0: - elog(ERROR, "cannot call smgrimmedsync() on rel with unknown persistence"); + neon_log(ERROR, "cannot call smgrimmedsync() on rel with unknown persistence"); break; case RELPERSISTENCE_PERMANENT: @@ -2538,10 +2593,10 @@ neon_immedsync(SMgrRelation reln, ForkNumber forknum) return; default: - elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); + neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); } - elog(SmgrTrace, "[NEON_SMGR] immedsync noop"); + neon_log(SmgrTrace, "[NEON_SMGR] immedsync noop"); #ifdef DEBUG_COMPARE_LOCAL if (IS_LOCAL_REL(reln)) @@ -2566,17 +2621,17 @@ neon_start_unlogged_build(SMgrRelation reln) * progress at a time. That's enough for the current usage. */ if (unlogged_build_phase != UNLOGGED_BUILD_NOT_IN_PROGRESS) - elog(ERROR, "unlogged relation build is already in progress"); + neon_log(ERROR, "unlogged relation build is already in progress"); Assert(unlogged_build_rel == NULL); ereport(SmgrTrace, - (errmsg("starting unlogged build of relation %u/%u/%u", + (errmsg(NEON_TAG "starting unlogged build of relation %u/%u/%u", RelFileInfoFmt(InfoFromSMgrRel(reln))))); switch (reln->smgr_relpersistence) { case 0: - elog(ERROR, "cannot call smgr_start_unlogged_build() on rel with unknown persistence"); + neon_log(ERROR, "cannot call smgr_start_unlogged_build() on rel with unknown persistence"); break; case RELPERSISTENCE_PERMANENT: @@ -2589,11 +2644,11 @@ neon_start_unlogged_build(SMgrRelation reln) return; default: - elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); + neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); } if (smgrnblocks(reln, MAIN_FORKNUM) != 0) - elog(ERROR, "cannot perform unlogged index build, index is not empty "); + neon_log(ERROR, "cannot perform unlogged index build, index is not empty "); unlogged_build_rel = reln; unlogged_build_phase = UNLOGGED_BUILD_PHASE_1; @@ -2620,7 +2675,7 @@ neon_finish_unlogged_build_phase_1(SMgrRelation reln) Assert(unlogged_build_rel == reln); ereport(SmgrTrace, - (errmsg("finishing phase 1 of unlogged build of relation %u/%u/%u", + (errmsg(NEON_TAG "finishing phase 1 of unlogged build of relation %u/%u/%u", RelFileInfoFmt(InfoFromSMgrRel(reln))))); if (unlogged_build_phase == UNLOGGED_BUILD_NOT_PERMANENT) @@ -2649,7 +2704,7 @@ neon_end_unlogged_build(SMgrRelation reln) Assert(unlogged_build_rel == reln); ereport(SmgrTrace, - (errmsg("ending unlogged build of relation %u/%u/%u", + (errmsg(NEON_TAG "ending unlogged build of relation %u/%u/%u", RelFileInfoFmt(InfoFromNInfoB(rinfob))))); if (unlogged_build_phase != UNLOGGED_BUILD_NOT_PERMANENT) @@ -2664,7 +2719,7 @@ neon_end_unlogged_build(SMgrRelation reln) rinfob = InfoBFromSMgrRel(reln); for (int forknum = 0; forknum <= MAX_FORKNUM; forknum++) { - elog(SmgrTrace, "forgetting cached relsize for %u/%u/%u.%u", + neon_log(SmgrTrace, "forgetting cached relsize for %u/%u/%u.%u", RelFileInfoFmt(InfoFromNInfoB(rinfob)), forknum); @@ -2707,7 +2762,7 @@ AtEOXact_neon(XactEvent event, void *arg) unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS; ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - (errmsg("unlogged index build was not properly finished")))); + (errmsg(NEON_TAG "unlogged index build was not properly finished")))); } break; } @@ -2806,14 +2861,14 @@ neon_extend_rel_size(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, set_cached_relsize(rinfo, forknum, relsize); SetLastWrittenLSNForRelation(end_recptr, rinfo, forknum); - elog(SmgrTrace, "Set length to %d", relsize); + neon_log(SmgrTrace, "Set length to %d", relsize); } } #define FSM_TREE_DEPTH ((SlotsPerFSMPage >= 1626) ? 3 : 4) /* - * TODO: May be it is better to make correspondent fgunctio from freespace.c public? + * TODO: May be it is better to make correspondent function from freespace.c public? */ static BlockNumber get_fsm_physical_block(BlockNumber heapblk) @@ -2894,7 +2949,7 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id) #if PG_VERSION_NUM < 150000 if (!XLogRecGetBlockTag(record, block_id, &rinfo, &forknum, &blkno)) - elog(PANIC, "failed to locate backup block with ID %d", block_id); + neon_log(PANIC, "failed to locate backup block with ID %d", block_id); #else XLogRecGetBlockTag(record, block_id, &rinfo, &forknum, &blkno); #endif