Compare commits

...

26 Commits

Author SHA1 Message Date
Konstantin Knizhnik
8de5b63ac4 Remove assert for non-zero number of shards from AssignPageserverConnstring 2023-12-20 09:16:54 +02:00
Konstantin Knizhnik
338bf7a446 Bump postgres version 2023-12-19 22:17:07 +02:00
Konstantin Knizhnik
767ce2b187 Bump Postgres version 2023-12-19 16:21:15 +02:00
Konstantin Knizhnik
80bf8d4761 Allow empty connection string 2023-12-19 14:45:25 +02:00
Konstantin Knizhnik
c2b396905f Merge with main 2023-12-19 10:20:56 +02:00
Konstantin Knizhnik
b43ae6e26c Address review comments 2023-12-19 09:22:42 +02:00
Konstantin Knizhnik
151870b666 Update pgxn/neon/libpagestore.c
Co-authored-by: Sasha Krassovsky <sasha@neon.tech>
2023-12-19 09:22:42 +02:00
Konstantin Knizhnik
4cdfff75f2 Update pgxn/neon/libpagestore.c
Co-authored-by: Sasha Krassovsky <sasha@neon.tech>
2023-12-19 09:22:42 +02:00
Konstantin Knizhnik
02abe3c82e Fix problem with stats collector at pg14 2023-12-19 09:22:42 +02:00
Konstantin Knizhnik
5c8b3d997f Add [NEON_SMGR] to all messages produced by Neon exrtension 2023-12-19 09:22:42 +02:00
Konstantin Knizhnik
b465185738 Add [NEON_SMGR] to all messages produced by Neon exrtension 2023-12-19 09:22:42 +02:00
John Spray
4e47d3a984 pgxn: amend key hashing 2023-12-19 09:22:42 +02:00
Konstantin Knizhnik
3853646c75 [see #6052] make connection logging shard-aware 2023-12-19 09:22:42 +02:00
Konstantin Knizhnik
86c76d4b59 Fix shard map reload synchronization 2023-12-19 09:22:42 +02:00
Konstantin Knizhnik
864d5f84e9 Fix shard map reload mechanism 2023-12-19 09:22:42 +02:00
Konstantin Knizhnik
38f64ea8ef Load shard map only at postmaster 2023-12-19 09:22:42 +02:00
Konstantin Knizhnik
2855074f55 Fix comments 2023-12-19 09:22:42 +02:00
Konstantin Knizhnik
16b0348c1f Do not deop PS connections of config reload if connection strings are not changed 2023-12-19 09:22:42 +02:00
Konstantin Knizhnik
93c75c97e1 Fix shard hash caclulation 2023-12-19 09:22:42 +02:00
Konstantin Knizhnik
d3221243f8 Minor refectoring 2023-12-19 09:22:42 +02:00
Konstantin Knizhnik
3cbd2df0b3 Add neon.stripe_size 2023-12-19 09:22:42 +02:00
Konstantin Knizhnik
8ddfa0515e Undo occsional changed in control_place_connector.c 2023-12-19 09:22:42 +02:00
Konstantin Knizhnik
3019f8fe8f Merge with main 2023-12-19 09:22:42 +02:00
Konstantin Knizhnik
e720b5f9d4 Load shardmap from postgresql.conf 2023-12-19 09:22:42 +02:00
Konstantin Knizhnik
039fa60446 Take in account stripe size when calculating shard hash number 2023-12-19 09:22:42 +02:00
Konstantin Knizhnik
7ca20dd12b Add support for PS shardoing in compute 2023-12-19 09:22:42 +02:00
8 changed files with 489 additions and 289 deletions

View File

@@ -308,13 +308,13 @@ lfc_change_limit_hook(int newval, void *extra)
Assert(victim->access_count == 0); Assert(victim->access_count == 0);
#ifdef FALLOC_FL_PUNCH_HOLE #ifdef FALLOC_FL_PUNCH_HOLE
if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, (off_t) victim->offset * BLOCKS_PER_CHUNK * BLCKSZ, BLOCKS_PER_CHUNK * BLCKSZ) < 0) if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, (off_t) victim->offset * BLOCKS_PER_CHUNK * BLCKSZ, BLOCKS_PER_CHUNK * BLCKSZ) < 0)
elog(LOG, "Failed to punch hole in file: %m"); neon_log(LOG, "Failed to punch hole in file: %m");
#endif #endif
hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL); hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL);
lfc_ctl->used -= 1; lfc_ctl->used -= 1;
} }
lfc_ctl->limit = new_size; lfc_ctl->limit = new_size;
elog(DEBUG1, "set local file cache limit to %d", new_size); neon_log(DEBUG1, "set local file cache limit to %d", new_size);
LWLockRelease(lfc_lock); LWLockRelease(lfc_lock);
} }
@@ -327,7 +327,7 @@ lfc_init(void)
* shared_preload_libraries. * shared_preload_libraries.
*/ */
if (!process_shared_preload_libraries_in_progress) if (!process_shared_preload_libraries_in_progress)
elog(ERROR, "Neon module should be loaded via shared_preload_libraries"); neon_log(ERROR, "Neon module should be loaded via shared_preload_libraries");
DefineCustomIntVariable("neon.max_file_cache_size", DefineCustomIntVariable("neon.max_file_cache_size",
@@ -643,7 +643,7 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, const void
Assert(victim->access_count == 0); Assert(victim->access_count == 0);
entry->offset = victim->offset; /* grab victim's chunk */ entry->offset = victim->offset; /* grab victim's chunk */
hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL); hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL);
elog(DEBUG2, "Swap file cache page"); neon_log(DEBUG2, "Swap file cache page");
} }
else else
{ {
@@ -846,10 +846,10 @@ local_cache_pages(PG_FUNCTION_ARGS)
* wrong) function definition though. * wrong) function definition though.
*/ */
if (get_call_result_type(fcinfo, NULL, &expected_tupledesc) != TYPEFUNC_COMPOSITE) if (get_call_result_type(fcinfo, NULL, &expected_tupledesc) != TYPEFUNC_COMPOSITE)
elog(ERROR, "return type must be a row type"); neon_log(ERROR, "return type must be a row type");
if (expected_tupledesc->natts != NUM_LOCALCACHE_PAGES_ELEM) if (expected_tupledesc->natts != NUM_LOCALCACHE_PAGES_ELEM)
elog(ERROR, "incorrect number of output arguments"); neon_log(ERROR, "incorrect number of output arguments");
/* Construct a tuple descriptor for the result rows. */ /* Construct a tuple descriptor for the result rows. */
tupledesc = CreateTemplateTupleDesc(expected_tupledesc->natts); tupledesc = CreateTemplateTupleDesc(expected_tupledesc->natts);

View File

@@ -15,6 +15,7 @@
#include "postgres.h" #include "postgres.h"
#include "access/xlog.h" #include "access/xlog.h"
#include "common/hashfn.h"
#include "fmgr.h" #include "fmgr.h"
#include "libpq-fe.h" #include "libpq-fe.h"
#include "libpq/libpq.h" #include "libpq/libpq.h"
@@ -37,17 +38,6 @@
#define RECONNECT_INTERVAL_USEC 1000000 #define RECONNECT_INTERVAL_USEC 1000000
bool connected = false;
PGconn *pageserver_conn = NULL;
/*
* WaitEventSet containing:
* - WL_SOCKET_READABLE on pageserver_conn,
* - WL_LATCH_SET on MyLatch, and
* - WL_EXIT_ON_PM_DEATH.
*/
WaitEventSet *pageserver_conn_wes = NULL;
/* GUCs */ /* GUCs */
char *neon_timeline; char *neon_timeline;
char *neon_tenant; char *neon_tenant;
@@ -58,87 +48,206 @@ char *neon_auth_token;
int readahead_buffer_size = 128; int readahead_buffer_size = 128;
int flush_every_n_requests = 8; int flush_every_n_requests = 8;
static int n_reconnect_attempts = 0; static int n_reconnect_attempts = 0;
static int max_reconnect_attempts = 60; static int max_reconnect_attempts = 60;
static int stripe_size;
#define MAX_PAGESERVER_CONNSTRING_SIZE 256 bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL;
static bool pageserver_flush(shardno_t shard_no);
static void pageserver_disconnect(shardno_t shard_no);
static void AssignPageserverConnstring(const char *newval, void *extra);
static bool CheckPageserverConnstring(char **newval, void **extra, GucSource source);
static shmem_startup_hook_type prev_shmem_startup_hook;
#if PG_VERSION_NUM>=150000
static shmem_request_hook_type prev_shmem_request_hook;
#endif
/*
* ShardMap is kept in shared memory. It contains the connection strings for
* each shard.
*
* There is "neon.pageserver_connstring" GUC with PGC_SIGHUP option, allowing to change it using
* pg_reload_conf(). It is used by control plane to update shards information if page server is crashed,
* relocated or new shards are added. This GUC variable contains comma separated list of connection strings.
* It is copied to shared memory because config can not be loaded during query execution and we need to
* reestablish connection to page server.
*
* So usually copying connection string to shared memory is done by postmaster. And other backends
* should check update counter to determine of connection URL is changed and connection needs to be reestablished.
*
* But at startup shared memory is not yet initialized and so we need to copy in some other process.
* Moreover, we can not use standard Postgres LW-locks, because postmaster has proc entry and so can not wait
* on this primitive. This is why lockless access algorithm is implemented using two atomic counters to enforce
* consistent reading of connection string value from shared memory.
*/
typedef struct
{
size_t n_shards;
pg_atomic_uint64 begin_update_counter;
pg_atomic_uint64 end_update_counter;
char shard_connstr[MAX_SHARDS][MAX_PS_CONNSTR_LEN];
} ShardMap;
static ShardMap* shard_map;
static uint64 shard_map_update_counter;
typedef struct typedef struct
{ {
LWLockId lock; /*
pg_atomic_uint64 update_counter; * Connection for each shard
char pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE]; */
} PagestoreShmemState; PGconn *conn;
/*
* WaitEventSet containing:
* - WL_SOCKET_READABLE on 'conn'
* - WL_LATCH_SET on MyLatch, and
* - WL_EXIT_ON_PM_DEATH.
*/
WaitEventSet *wes;
} PageServer;
#if PG_VERSION_NUM >= 150000 static PageServer page_servers[MAX_SHARDS];
static shmem_request_hook_type prev_shmem_request_hook = NULL; static shardno_t max_attached_shard_no;
static void walproposer_shmem_request(void);
static void
psm_shmem_startup(void)
{
bool found;
if (prev_shmem_startup_hook)
{
prev_shmem_startup_hook();
}
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
shard_map = (ShardMap*)ShmemInitStruct("shard_map", sizeof(ShardMap), &found);
if (!found)
{
shard_map->n_shards = 0;
pg_atomic_init_u64(&shard_map->begin_update_counter, 0);
pg_atomic_init_u64(&shard_map->end_update_counter, 0);
AssignPageserverConnstring(page_server_connstring, NULL);
}
LWLockRelease(AddinShmemInitLock);
}
static void
psm_shmem_request(void)
{
#if PG_VERSION_NUM>=150000
if (prev_shmem_request_hook)
prev_shmem_request_hook();
#endif #endif
static shmem_startup_hook_type prev_shmem_startup_hook;
static PagestoreShmemState *pagestore_shared;
static uint64 pagestore_local_counter = 0;
static char local_pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE];
static bool pageserver_flush(void); RequestAddinShmemSpace(sizeof(ShardMap));
static void pageserver_disconnect(void);
static bool
PagestoreShmemIsValid()
{
return pagestore_shared && UsedShmemSegAddr;
}
static bool
CheckPageserverConnstring(char **newval, void **extra, GucSource source)
{
return strlen(*newval) < MAX_PAGESERVER_CONNSTRING_SIZE;
} }
static void static void
AssignPageserverConnstring(const char *newval, void *extra) psm_init(void)
{ {
if (!PagestoreShmemIsValid()) prev_shmem_startup_hook = shmem_startup_hook;
return; shmem_startup_hook = psm_shmem_startup;
LWLockAcquire(pagestore_shared->lock, LW_EXCLUSIVE); #if PG_VERSION_NUM>=150000
strlcpy(pagestore_shared->pageserver_connstring, newval, MAX_PAGESERVER_CONNSTRING_SIZE); prev_shmem_request_hook = shmem_request_hook;
pg_atomic_fetch_add_u64(&pagestore_shared->update_counter, 1); shmem_request_hook = psm_shmem_request;
LWLockRelease(pagestore_shared->lock); #else
psm_shmem_request();
#endif
}
/*
* Reload page map if needed and return number of shards and connection string for the specified shard
* 'connstr' is an output buffer. If not NULL, it must point to a buffer at least MAX_PS_CONNSTR_LEN bytes
* long. The connection string for the gven shard is copied to it.
*/
static shardno_t
load_shard_map(shardno_t shard_no, char* connstr)
{
shardno_t n_shards;
uint64 begin_update_counter;
uint64 end_update_counter;
/*
* There is race condition here between backend and postmaster which can update shard map.
* We recheck update counter after copying shard map to check that configuration was not changed.
*/
do
{
begin_update_counter = pg_atomic_read_u64(&shard_map->begin_update_counter);
end_update_counter = pg_atomic_read_u64(&shard_map->end_update_counter);
n_shards = shard_map->n_shards;
if (shard_no >= n_shards)
neon_log(ERROR, "Shard %d is greater or equal than number of shards %d", shard_no, n_shards);
if (connstr)
{
/*
* We need to use strlcpy here because due to race condition string oin shared memory
* may be not zero terminated.
*/
strlcpy(connstr, shard_map->shard_connstr[shard_no], MAX_PS_CONNSTR_LEN);
pg_memory_barrier();
}
}
while (begin_update_counter != end_update_counter
|| begin_update_counter != pg_atomic_read_u64(&shard_map->begin_update_counter)
|| end_update_counter != pg_atomic_read_u64(&shard_map->end_update_counter));
if (shard_map_update_counter != end_update_counter)
{
/* Reset all connections if connection strings are changed */
for (shardno_t i = 0; i < max_attached_shard_no; i++)
{
if (page_servers[i].conn)
pageserver_disconnect(i);
}
max_attached_shard_no = 0;
shard_map_update_counter = end_update_counter;
}
return n_shards;
}
#define MB (1024*1024)
shardno_t
get_shard_number(BufferTag* tag)
{
shardno_t n_shards = load_shard_map(0, NULL);
uint32 hash;
#if PG_MAJORVERSION_NUM < 16
hash = murmurhash32(tag->rnode.relNode);
hash = hash_combine(hash, murmurhash32(tag->blockNum/(MB/BLCKSZ)/stripe_size));
#else
hash = murmurhash32(tag->relNumber);
hash = hash_combine(hash, murmurhash32(tag->blockNum/(MB/BLCKSZ)/stripe_size));
#endif
return hash % n_shards;
} }
static bool static bool
CheckConnstringUpdated() pageserver_connect(shardno_t shard_no, int elevel)
{
if (!PagestoreShmemIsValid())
return false;
return pagestore_local_counter < pg_atomic_read_u64(&pagestore_shared->update_counter);
}
static void
ReloadConnstring()
{
if (!PagestoreShmemIsValid())
return;
LWLockAcquire(pagestore_shared->lock, LW_SHARED);
strlcpy(local_pageserver_connstring, pagestore_shared->pageserver_connstring, sizeof(local_pageserver_connstring));
pagestore_local_counter = pg_atomic_read_u64(&pagestore_shared->update_counter);
LWLockRelease(pagestore_shared->lock);
}
static bool
pageserver_connect(int elevel)
{ {
char *query; char *query;
int ret; int ret;
const char *keywords[3]; const char *keywords[3];
const char *values[3]; const char *values[3];
int n; int n;
PGconn* conn;
WaitEventSet *wes;
char connstr[MAX_PS_CONNSTR_LEN];
Assert(!connected); Assert(page_servers[shard_no].conn == NULL);
if (CheckConnstringUpdated()) (void)load_shard_map(shard_no, connstr); /* refresh page map if needed */
{
ReloadConnstring();
}
/* /*
* Connect using the connection string we got from the * Connect using the connection string we got from the
@@ -158,50 +267,47 @@ pageserver_connect(int elevel)
n++; n++;
} }
keywords[n] = "dbname"; keywords[n] = "dbname";
values[n] = local_pageserver_connstring; values[n] = connstr;
n++; n++;
keywords[n] = NULL; keywords[n] = NULL;
values[n] = NULL; values[n] = NULL;
n++; n++;
pageserver_conn = PQconnectdbParams(keywords, values, 1); conn = PQconnectdbParams(keywords, values, 1);
if (PQstatus(pageserver_conn) == CONNECTION_BAD) if (PQstatus(conn) == CONNECTION_BAD)
{ {
char *msg = pchomp(PQerrorMessage(pageserver_conn)); char *msg = pchomp(PQerrorMessage(conn));
PQfinish(pageserver_conn); PQfinish(conn);
pageserver_conn = NULL;
ereport(elevel, ereport(elevel,
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
errmsg(NEON_TAG "could not establish connection to pageserver"), errmsg(NEON_TAG "[shard %d] could not establish connection to pageserver", shard_no),
errdetail_internal("%s", msg))); errdetail_internal("%s", msg)));
return false; return false;
} }
query = psprintf("pagestream %s %s", neon_tenant, neon_timeline); query = psprintf("pagestream %s %s", neon_tenant, neon_timeline);
ret = PQsendQuery(pageserver_conn, query); ret = PQsendQuery(conn, query);
if (ret != 1) if (ret != 1)
{ {
PQfinish(pageserver_conn); PQfinish(conn);
pageserver_conn = NULL; neon_shard_log(shard_no, elevel, "could not send pagestream command to pageserver");
neon_log(elevel, "could not send pagestream command to pageserver");
return false; return false;
} }
pageserver_conn_wes = CreateWaitEventSet(TopMemoryContext, 3); wes = CreateWaitEventSet(TopMemoryContext, 3);
AddWaitEventToSet(pageserver_conn_wes, WL_LATCH_SET, PGINVALID_SOCKET, AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET,
MyLatch, NULL); MyLatch, NULL);
AddWaitEventToSet(pageserver_conn_wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, AddWaitEventToSet(wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
NULL, NULL); NULL, NULL);
AddWaitEventToSet(pageserver_conn_wes, WL_SOCKET_READABLE, PQsocket(pageserver_conn), NULL, NULL); AddWaitEventToSet(wes, WL_SOCKET_READABLE, PQsocket(conn), NULL, NULL);
while (PQisBusy(pageserver_conn)) while (PQisBusy(conn))
{ {
WaitEvent event; WaitEvent event;
/* Sleep until there's something to do */ /* Sleep until there's something to do */
(void) WaitEventSetWait(pageserver_conn_wes, -1L, &event, 1, PG_WAIT_EXTENSION); (void) WaitEventSetWait(wes, -1L, &event, 1, PG_WAIT_EXTENSION);
ResetLatch(MyLatch); ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
@@ -209,25 +315,25 @@ pageserver_connect(int elevel)
/* Data available in socket? */ /* Data available in socket? */
if (event.events & WL_SOCKET_READABLE) if (event.events & WL_SOCKET_READABLE)
{ {
if (!PQconsumeInput(pageserver_conn)) if (!PQconsumeInput(conn))
{ {
char *msg = pchomp(PQerrorMessage(pageserver_conn)); char *msg = pchomp(PQerrorMessage(conn));
PQfinish(pageserver_conn); PQfinish(conn);
pageserver_conn = NULL; FreeWaitEventSet(wes);
FreeWaitEventSet(pageserver_conn_wes);
pageserver_conn_wes = NULL;
neon_log(elevel, "could not complete handshake with pageserver: %s", neon_shard_log(shard_no, elevel, "could not complete handshake with pageserver: %s",
msg); msg);
return false; return false;
} }
} }
} }
neon_log(LOG, "libpagestore: connected to '%s'", page_server_connstring); neon_shard_log(shard_no, LOG, "libpagestore: connected to '%s'", connstr);
page_servers[shard_no].conn = conn;
page_servers[shard_no].wes = wes;
max_attached_shard_no = Max(shard_no+1, max_attached_shard_no);
connected = true;
return true; return true;
} }
@@ -235,10 +341,10 @@ pageserver_connect(int elevel)
* A wrapper around PQgetCopyData that checks for interrupts while sleeping. * A wrapper around PQgetCopyData that checks for interrupts while sleeping.
*/ */
static int static int
call_PQgetCopyData(char **buffer) call_PQgetCopyData(shardno_t shard_no, char **buffer)
{ {
int ret; int ret;
PGconn* pageserver_conn = page_servers[shard_no].conn;
retry: retry:
ret = PQgetCopyData(pageserver_conn, buffer, 1 /* async */ ); ret = PQgetCopyData(pageserver_conn, buffer, 1 /* async */ );
@@ -247,7 +353,7 @@ retry:
WaitEvent event; WaitEvent event;
/* Sleep until there's something to do */ /* Sleep until there's something to do */
(void) WaitEventSetWait(pageserver_conn_wes, -1L, &event, 1, PG_WAIT_EXTENSION); (void) WaitEventSetWait(page_servers[shard_no].wes, -1L, &event, 1, PG_WAIT_EXTENSION);
ResetLatch(MyLatch); ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
@@ -259,7 +365,7 @@ retry:
{ {
char *msg = pchomp(PQerrorMessage(pageserver_conn)); char *msg = pchomp(PQerrorMessage(pageserver_conn));
neon_log(LOG, "could not get response from pageserver: %s", msg); neon_shard_log(shard_no, LOG, "could not get response from pageserver: %s", msg);
pfree(msg); pfree(msg);
return -1; return -1;
} }
@@ -273,7 +379,7 @@ retry:
static void static void
pageserver_disconnect(void) pageserver_disconnect(shardno_t shard_no)
{ {
/* /*
* If anything goes wrong while we were sending a request, it's not clear * If anything goes wrong while we were sending a request, it's not clear
@@ -282,38 +388,32 @@ pageserver_disconnect(void)
* time later after we have already sent a new unrelated request. Close * time later after we have already sent a new unrelated request. Close
* the connection to avoid getting confused. * the connection to avoid getting confused.
*/ */
if (connected) if (page_servers[shard_no].conn)
{ {
neon_log(LOG, "dropping connection to page server due to error"); neon_shard_log(shard_no, LOG, "dropping connection to page server due to error");
PQfinish(pageserver_conn); PQfinish(page_servers[shard_no].conn);
pageserver_conn = NULL; page_servers[shard_no].conn = NULL;
connected = false;
prefetch_on_ps_disconnect(); prefetch_on_ps_disconnect();
} }
if (pageserver_conn_wes != NULL) if (page_servers[shard_no].wes != NULL)
{ {
FreeWaitEventSet(pageserver_conn_wes); FreeWaitEventSet(page_servers[shard_no].wes);
pageserver_conn_wes = NULL; page_servers[shard_no].wes = NULL;
} }
} }
static bool static bool
pageserver_send(NeonRequest *request) pageserver_send(shardno_t shard_no, NeonRequest *request)
{ {
StringInfoData req_buff; StringInfoData req_buff;
PGconn* pageserver_conn = page_servers[shard_no].conn;
if (CheckConnstringUpdated())
{
pageserver_disconnect();
ReloadConnstring();
}
/* If the connection was lost for some reason, reconnect */ /* If the connection was lost for some reason, reconnect */
if (connected && PQstatus(pageserver_conn) == CONNECTION_BAD) if (pageserver_conn && PQstatus(pageserver_conn) == CONNECTION_BAD)
{ {
neon_log(LOG, "pageserver_send disconnect bad connection"); neon_shard_log(shard_no, LOG, "pageserver_send disconnect bad connection");
pageserver_disconnect(); pageserver_disconnect(shard_no);
} }
req_buff = nm_pack_request(request); req_buff = nm_pack_request(request);
@@ -327,9 +427,9 @@ pageserver_send(NeonRequest *request)
* https://github.com/neondatabase/neon/issues/1138 So try to reestablish * https://github.com/neondatabase/neon/issues/1138 So try to reestablish
* connection in case of failure. * connection in case of failure.
*/ */
if (!connected) if (!page_servers[shard_no].conn)
{ {
while (!pageserver_connect(n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR)) while (!pageserver_connect(shard_no, n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR))
{ {
HandleMainLoopInterrupts(); HandleMainLoopInterrupts();
n_reconnect_attempts += 1; n_reconnect_attempts += 1;
@@ -338,7 +438,9 @@ pageserver_send(NeonRequest *request)
n_reconnect_attempts = 0; n_reconnect_attempts = 0;
} }
/* pageserver_conn = page_servers[shard_no].conn;
/*
* Send request. * Send request.
* *
* In principle, this could block if the output buffer is full, and we * In principle, this could block if the output buffer is full, and we
@@ -349,9 +451,8 @@ pageserver_send(NeonRequest *request)
if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0) if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0)
{ {
char *msg = pchomp(PQerrorMessage(pageserver_conn)); char *msg = pchomp(PQerrorMessage(pageserver_conn));
pageserver_disconnect(shard_no);
pageserver_disconnect(); neon_shard_log(shard_no, LOG, "pageserver_send disconnect because failed to send page request (try to reconnect): %s", msg);
neon_log(LOG, "pageserver_send disconnect because failed to send page request (try to reconnect): %s", msg);
pfree(msg); pfree(msg);
pfree(req_buff.data); pfree(req_buff.data);
return false; return false;
@@ -363,19 +464,19 @@ pageserver_send(NeonRequest *request)
{ {
char *msg = nm_to_string((NeonMessage *) request); char *msg = nm_to_string((NeonMessage *) request);
neon_log(PageStoreTrace, "sent request: %s", msg); neon_shard_log(shard_no, PageStoreTrace, "sent request: %s", msg);
pfree(msg); pfree(msg);
} }
return true; return true;
} }
static NeonResponse * static NeonResponse *
pageserver_receive(void) pageserver_receive(shardno_t shard_no)
{ {
StringInfoData resp_buff; StringInfoData resp_buff;
NeonResponse *resp; NeonResponse *resp;
PGconn* pageserver_conn = page_servers[shard_no].conn;
if (!connected) if (!pageserver_conn)
return NULL; return NULL;
PG_TRY(); PG_TRY();
@@ -383,7 +484,7 @@ pageserver_receive(void)
/* read response */ /* read response */
int rc; int rc;
rc = call_PQgetCopyData(&resp_buff.data); rc = call_PQgetCopyData(shard_no, &resp_buff.data);
if (rc >= 0) if (rc >= 0)
{ {
resp_buff.len = rc; resp_buff.len = rc;
@@ -395,33 +496,33 @@ pageserver_receive(void)
{ {
char *msg = nm_to_string((NeonMessage *) resp); char *msg = nm_to_string((NeonMessage *) resp);
neon_log(PageStoreTrace, "got response: %s", msg); neon_shard_log(shard_no, PageStoreTrace, "got response: %s", msg);
pfree(msg); pfree(msg);
} }
} }
else if (rc == -1) else if (rc == -1)
{ {
neon_log(LOG, "pageserver_receive disconnect because call_PQgetCopyData returns -1: %s", pchomp(PQerrorMessage(pageserver_conn))); neon_shard_log(shard_no, LOG, "pageserver_receive disconnect because call_PQgetCopyData returns -1: %s", pchomp(PQerrorMessage(pageserver_conn)));
pageserver_disconnect(); pageserver_disconnect(shard_no);
resp = NULL; resp = NULL;
} }
else if (rc == -2) else if (rc == -2)
{ {
char *msg = pchomp(PQerrorMessage(pageserver_conn)); char *msg = pchomp(PQerrorMessage(pageserver_conn));
pageserver_disconnect(); pageserver_disconnect(shard_no);
neon_log(ERROR, "pageserver_receive disconnect because could not read COPY data: %s", msg); neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect because could not read COPY data: %s", msg);
} }
else else
{ {
pageserver_disconnect(); pageserver_disconnect(shard_no);
neon_log(ERROR, "pageserver_receive disconnect because unexpected PQgetCopyData return value: %d", rc); neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect because unexpected PQgetCopyData return value: %d", rc);
} }
} }
PG_CATCH(); PG_CATCH();
{ {
neon_log(LOG, "pageserver_receive disconnect due to caught exception"); neon_shard_log(shard_no, LOG, "pageserver_receive disconnect due to caught exception");
pageserver_disconnect(); pageserver_disconnect(shard_no);
PG_RE_THROW(); PG_RE_THROW();
} }
PG_END_TRY(); PG_END_TRY();
@@ -431,11 +532,12 @@ pageserver_receive(void)
static bool static bool
pageserver_flush(void) pageserver_flush(shardno_t shard_no)
{ {
if (!connected) PGconn* pageserver_conn = page_servers[shard_no].conn;
if (!pageserver_conn)
{ {
neon_log(WARNING, "Tried to flush while disconnected"); neon_shard_log(shard_no, WARNING, "Tried to flush while disconnected");
} }
else else
{ {
@@ -443,8 +545,8 @@ pageserver_flush(void)
{ {
char *msg = pchomp(PQerrorMessage(pageserver_conn)); char *msg = pchomp(PQerrorMessage(pageserver_conn));
pageserver_disconnect(); pageserver_disconnect(shard_no);
neon_log(LOG, "pageserver_flush disconnect because failed to flush page requests: %s", msg); neon_shard_log(shard_no, LOG, "pageserver_flush disconnect because failed to flush page requests: %s", msg);
pfree(msg); pfree(msg);
return false; return false;
} }
@@ -467,63 +569,83 @@ check_neon_id(char **newval, void **extra, GucSource source)
return **newval == '\0' || HexDecodeString(id, *newval, 16); return **newval == '\0' || HexDecodeString(id, *newval, 16);
} }
static Size
PagestoreShmemSize(void)
{
return sizeof(PagestoreShmemState);
}
static bool static bool
PagestoreShmemInit(void) CheckPageserverConnstring(char **newval, void **extra, GucSource source)
{ {
bool found; const char* shard_connstr = *newval;
const char* sep;
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); size_t connstr_len;
pagestore_shared = ShmemInitStruct("libpagestore shared state", int i = 0;
PagestoreShmemSize(), do
&found);
if (!found)
{ {
pagestore_shared->lock = &(GetNamedLWLockTranche("neon_libpagestore")->lock); sep = strchr(shard_connstr, ',');
pg_atomic_init_u64(&pagestore_shared->update_counter, 0); connstr_len = sep != NULL ? sep - shard_connstr : strlen(shard_connstr);
AssignPageserverConnstring(page_server_connstring, NULL); if (connstr_len == 0)
break; /* trailing comma */
if (i >= MAX_SHARDS)
{
neon_log(LOG, "Too many shards");
return false;
}
if (connstr_len >= MAX_PS_CONNSTR_LEN)
{
neon_log(LOG, "Connection string too long");
return false;
}
shard_connstr = sep + 1;
i += 1;
} while (sep != NULL);
return true;
}
static void
AssignPageserverConnstring(const char *newval, void *extra)
{
/*
* Load shard map only at Postmaster.
* If old page server is not available, then backends can be blocked in attempts to reconnect to it and do not reload config in this loop
*
* Copying GUC value to shared memory is usually performed by postmaster. But in case of startup,
* shared memory is not yet initialized. So it has to be performed by any other process.
* It is not a problem if more than one process do this initialization.
*/
if (shard_map != NULL && UsedShmemSegAddr != NULL && (MyProcPid == PostmasterPid || shard_map->n_shards == 0))
{
const char* shard_connstr = newval;
const char* sep;
size_t connstr_len;
int i = 0;
bool shard_map_changed = false;
do
{
sep = strchr(shard_connstr, ',');
connstr_len = sep != NULL ? sep - shard_connstr : strlen(shard_connstr);
if (connstr_len == 0)
break; /* trailing comma */
Assert(i < MAX_SHARDS);
Assert(connstr_len < MAX_PS_CONNSTR_LEN);
if (i >= shard_map->n_shards ||
strcmp(shard_map->shard_connstr[i], shard_connstr) != 0)
{
if (!shard_map_changed)
{
pg_atomic_add_fetch_u64(&shard_map->begin_update_counter, 1);
shard_map_changed = true;
}
memcpy(shard_map->shard_connstr[i], shard_connstr, connstr_len+1);
}
shard_connstr = sep + 1;
i += 1;
} while (sep != NULL);
if (shard_map_changed)
{
shard_map->n_shards = i;
pg_memory_barrier();
pg_atomic_add_fetch_u64(&shard_map->end_update_counter, 1);
}
} }
LWLockRelease(AddinShmemInitLock);
return found;
}
static void
pagestore_shmem_startup_hook(void)
{
if (prev_shmem_startup_hook)
prev_shmem_startup_hook();
PagestoreShmemInit();
}
static void
pagestore_shmem_request(void)
{
#if PG_VERSION_NUM >= 150000
if (prev_shmem_request_hook)
prev_shmem_request_hook();
#endif
RequestAddinShmemSpace(PagestoreShmemSize());
RequestNamedLWLockTranche("neon_libpagestore", 1);
}
static void
pagestore_prepare_shmem(void)
{
#if PG_VERSION_NUM >= 150000
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = pagestore_shmem_request;
#else
pagestore_shmem_request();
#endif
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = pagestore_shmem_startup_hook;
} }
/* /*
@@ -532,8 +654,6 @@ pagestore_prepare_shmem(void)
void void
pg_init_libpagestore(void) pg_init_libpagestore(void)
{ {
pagestore_prepare_shmem();
DefineCustomStringVariable("neon.pageserver_connstring", DefineCustomStringVariable("neon.pageserver_connstring",
"connection string to the page server", "connection string to the page server",
NULL, NULL,
@@ -561,6 +681,15 @@ pg_init_libpagestore(void)
0, /* no flags required */ 0, /* no flags required */
check_neon_id, NULL, NULL); check_neon_id, NULL, NULL);
DefineCustomIntVariable("neon.stripe_size",
"sharding stripe size",
NULL,
&stripe_size,
32768, 1, INT_MAX,
PGC_SIGHUP,
GUC_UNIT_BLOCKS,
NULL, NULL, NULL);
DefineCustomIntVariable("neon.max_cluster_size", DefineCustomIntVariable("neon.max_cluster_size",
"cluster size limit", "cluster size limit",
NULL, NULL,
@@ -624,4 +753,5 @@ pg_init_libpagestore(void)
} }
lfc_init(); lfc_init();
psm_init();
} }

View File

@@ -17,12 +17,20 @@
#include "access/xlogdefs.h" #include "access/xlogdefs.h"
#include RELFILEINFO_HDR #include RELFILEINFO_HDR
#include "storage/block.h"
#include "storage/smgr.h"
#include "storage/buf_internals.h"
#include "lib/stringinfo.h" #include "lib/stringinfo.h"
#include "libpq/pqformat.h" #include "libpq/pqformat.h"
#include "storage/block.h" #include "storage/block.h"
#include "storage/smgr.h" #include "storage/smgr.h"
#include "utils/memutils.h" #include "utils/memutils.h"
#include "pg_config.h"
#define MAX_SHARDS 128
#define MAX_PS_CONNSTR_LEN 128
typedef enum typedef enum
{ {
/* pagestore_client -> pagestore */ /* pagestore_client -> pagestore */
@@ -51,6 +59,9 @@ typedef struct
#define neon_log(tag, fmt, ...) ereport(tag, \ #define neon_log(tag, fmt, ...) ereport(tag, \
(errmsg(NEON_TAG fmt, ##__VA_ARGS__), \ (errmsg(NEON_TAG fmt, ##__VA_ARGS__), \
errhidestmt(true), errhidecontext(true), errposition(0), internalerrposition(0))) errhidestmt(true), errhidecontext(true), errposition(0), internalerrposition(0)))
#define neon_shard_log(shard_no, tag, fmt, ...) ereport(tag, \
(errmsg(NEON_TAG "[shard %d] " fmt, shard_no, ##__VA_ARGS__), \
errhidestmt(true), errhidecontext(true), errposition(0), internalerrposition(0)))
/* /*
* supertype of all the Neon*Request structs below * supertype of all the Neon*Request structs below
@@ -141,11 +152,13 @@ extern char *nm_to_string(NeonMessage *msg);
* API * API
*/ */
typedef unsigned shardno_t;
typedef struct typedef struct
{ {
bool (*send) (NeonRequest *request); bool (*send) (shardno_t shard_no, NeonRequest * request);
NeonResponse *(*receive) (void); NeonResponse *(*receive) (shardno_t shard_no);
bool (*flush) (void); bool (*flush) (shardno_t shard_no);
} page_server_api; } page_server_api;
extern void prefetch_on_ps_disconnect(void); extern void prefetch_on_ps_disconnect(void);
@@ -159,6 +172,8 @@ extern char *neon_timeline;
extern char *neon_tenant; extern char *neon_tenant;
extern int32 max_cluster_size; extern int32 max_cluster_size;
extern shardno_t get_shard_number(BufferTag* tag);
extern const f_smgr *smgr_neon(BackendId backend, NRelFileInfo rinfo); extern const f_smgr *smgr_neon(BackendId backend, NRelFileInfo rinfo);
extern void smgr_init_neon(void); extern void smgr_init_neon(void);
extern void readahead_buffer_resize(int newsize, void *extra); extern void readahead_buffer_resize(int newsize, void *extra);

View File

@@ -172,6 +172,7 @@ typedef struct PrefetchRequest
XLogRecPtr actual_request_lsn; XLogRecPtr actual_request_lsn;
NeonResponse *response; /* may be null */ NeonResponse *response; /* may be null */
PrefetchStatus status; PrefetchStatus status;
shardno_t shard_no;
uint64 my_ring_index; uint64 my_ring_index;
} PrefetchRequest; } PrefetchRequest;
@@ -239,7 +240,9 @@ typedef struct PrefetchState
* also unused */ * also unused */
/* the buffers */ /* the buffers */
prfh_hash *prf_hash; prfh_hash *prf_hash;
int max_shard_no;
uint8 shard_bitmap[(MAX_SHARDS + 7)/8];
PrefetchRequest prf_buffer[]; /* prefetch buffers */ PrefetchRequest prf_buffer[]; /* prefetch buffers */
} PrefetchState; } PrefetchState;
@@ -327,6 +330,7 @@ compact_prefetch_buffers(void)
Assert(target_slot->status == PRFS_UNUSED); Assert(target_slot->status == PRFS_UNUSED);
target_slot->buftag = source_slot->buftag; target_slot->buftag = source_slot->buftag;
target_slot->shard_no = source_slot->shard_no;
target_slot->status = source_slot->status; target_slot->status = source_slot->status;
target_slot->response = source_slot->response; target_slot->response = source_slot->response;
target_slot->effective_request_lsn = source_slot->effective_request_lsn; target_slot->effective_request_lsn = source_slot->effective_request_lsn;
@@ -494,6 +498,23 @@ prefetch_cleanup_trailing_unused(void)
} }
} }
static bool
prefetch_flush_requests(void)
{
for (shardno_t shard_no = 0; shard_no < MyPState->max_shard_no; shard_no++)
{
if (MyPState->shard_bitmap[shard_no >> 3] & (1 << (shard_no & 7)))
{
if (!page_server->flush(shard_no))
return false;
MyPState->shard_bitmap[shard_no >> 3] &= ~(1 << (shard_no & 7));
}
}
MyPState->max_shard_no = 0;
return true;
}
/* /*
* Wait for slot of ring_index to have received its response. * Wait for slot of ring_index to have received its response.
* The caller is responsible for making sure the request buffer is flushed. * The caller is responsible for making sure the request buffer is flushed.
@@ -509,7 +530,7 @@ prefetch_wait_for(uint64 ring_index)
if (MyPState->ring_flush <= ring_index && if (MyPState->ring_flush <= ring_index &&
MyPState->ring_unused > MyPState->ring_flush) MyPState->ring_unused > MyPState->ring_flush)
{ {
if (!page_server->flush()) if (!prefetch_flush_requests())
return false; return false;
MyPState->ring_flush = MyPState->ring_unused; MyPState->ring_flush = MyPState->ring_unused;
} }
@@ -547,7 +568,7 @@ prefetch_read(PrefetchRequest *slot)
Assert(slot->my_ring_index == MyPState->ring_receive); Assert(slot->my_ring_index == MyPState->ring_receive);
old = MemoryContextSwitchTo(MyPState->errctx); old = MemoryContextSwitchTo(MyPState->errctx);
response = (NeonResponse *) page_server->receive(); response = (NeonResponse *) page_server->receive(slot->shard_no);
MemoryContextSwitchTo(old); MemoryContextSwitchTo(old);
if (response) if (response)
{ {
@@ -704,12 +725,14 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force
Assert(slot->response == NULL); Assert(slot->response == NULL);
Assert(slot->my_ring_index == MyPState->ring_unused); Assert(slot->my_ring_index == MyPState->ring_unused);
while (!page_server->send((NeonRequest *) &request)); while (!page_server->send(slot->shard_no, (NeonRequest *) &request));
/* update prefetch state */ /* update prefetch state */
MyPState->n_requests_inflight += 1; MyPState->n_requests_inflight += 1;
MyPState->n_unused -= 1; MyPState->n_unused -= 1;
MyPState->ring_unused += 1; MyPState->ring_unused += 1;
MyPState->shard_bitmap[slot->shard_no >> 3] |= 1 << (slot->shard_no & 7);
MyPState->max_shard_no = Max(slot->shard_no+1, MyPState->max_shard_no);
/* update slot state */ /* update slot state */
slot->status = PRFS_REQUESTED; slot->status = PRFS_REQUESTED;
@@ -880,6 +903,7 @@ Retry:
* function reads the buffer tag from the slot. * function reads the buffer tag from the slot.
*/ */
slot->buftag = tag; slot->buftag = tag;
slot->shard_no = get_shard_number(&tag);
slot->my_ring_index = ring_index; slot->my_ring_index = ring_index;
prefetch_do_request(slot, force_latest, force_lsn); prefetch_do_request(slot, force_latest, force_lsn);
@@ -890,7 +914,7 @@ Retry:
if (flush_every_n_requests > 0 && if (flush_every_n_requests > 0 &&
MyPState->ring_unused - MyPState->ring_flush >= flush_every_n_requests) MyPState->ring_unused - MyPState->ring_flush >= flush_every_n_requests)
{ {
if (!page_server->flush()) if (!prefetch_flush_requests())
{ {
/* /*
* Prefetch set is reset in case of error, so we should try to * Prefetch set is reset in case of error, so we should try to
@@ -908,13 +932,44 @@ static NeonResponse *
page_server_request(void const *req) page_server_request(void const *req)
{ {
NeonResponse *resp; NeonResponse *resp;
BufferTag tag = {0};
shardno_t shard_no;
switch (((NeonRequest *) req)->tag)
{
case T_NeonExistsRequest:
CopyNRelFileInfoToBufTag(tag, ((NeonExistsRequest *) req)->rinfo);
break;
case T_NeonNblocksRequest:
CopyNRelFileInfoToBufTag(tag, ((NeonNblocksRequest *) req)->rinfo);
break;
case T_NeonDbSizeRequest:
NInfoGetDbOid(BufTagGetNRelFileInfo(tag)) = ((NeonDbSizeRequest *) req)->dbNode;
break;
case T_NeonGetPageRequest:
CopyNRelFileInfoToBufTag(tag, ((NeonGetPageRequest *) req)->rinfo);
tag.blockNum = ((NeonGetPageRequest *) req)->blkno;
break;
default:
neon_log(ERROR, "Unexpected request tag: %d", ((NeonRequest *) req)->tag);
}
shard_no = get_shard_number(&tag);
/*
* Current sharding model assumes that all metadata is present only at shard 0.
* We still need to call get_shard_no() to check if shard map is up-to-date.
*/
if (((NeonRequest *) req)->tag != T_NeonGetPageRequest || ((NeonGetPageRequest *) req)->forknum != MAIN_FORKNUM)
{
shard_no = 0;
}
do do
{ {
while (!page_server->send((NeonRequest *) req) || !page_server->flush()); while (!page_server->send(shard_no, (NeonRequest *) req) || !page_server->flush(shard_no));
MyPState->ring_flush = MyPState->ring_unused;
consume_prefetch_responses(); consume_prefetch_responses();
resp = page_server->receive(); resp = page_server->receive(shard_no);
} while (resp == NULL); } while (resp == NULL);
return resp; return resp;
@@ -990,7 +1045,7 @@ nm_pack_request(NeonRequest *msg)
case T_NeonErrorResponse: case T_NeonErrorResponse:
case T_NeonDbSizeResponse: case T_NeonDbSizeResponse:
default: default:
elog(ERROR, "unexpected neon message tag 0x%02x", msg->tag); neon_log(ERROR, "unexpected neon message tag 0x%02x", msg->tag);
break; break;
} }
return s; return s;
@@ -1085,7 +1140,7 @@ nm_unpack_response(StringInfo s)
case T_NeonGetPageRequest: case T_NeonGetPageRequest:
case T_NeonDbSizeRequest: case T_NeonDbSizeRequest:
default: default:
elog(ERROR, "unexpected neon message tag 0x%02x", tag); neon_log(ERROR, "unexpected neon message tag 0x%02x", tag);
break; break;
} }
@@ -1277,7 +1332,7 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, co
XLogFlush(recptr); XLogFlush(recptr);
lsn = recptr; lsn = recptr;
ereport(SmgrTrace, ereport(SmgrTrace,
(errmsg("Page %u of relation %u/%u/%u.%u was force logged. Evicted at lsn=%X/%X", (errmsg(NEON_TAG "Page %u of relation %u/%u/%u.%u was force logged. Evicted at lsn=%X/%X",
blocknum, blocknum,
RelFileInfoFmt(InfoFromSMgrRel(reln)), RelFileInfoFmt(InfoFromSMgrRel(reln)),
forknum, LSN_FORMAT_ARGS(lsn)))); forknum, LSN_FORMAT_ARGS(lsn))));
@@ -1305,7 +1360,7 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, co
if (PageIsNew((Page) buffer)) if (PageIsNew((Page) buffer))
{ {
ereport(SmgrTrace, ereport(SmgrTrace,
(errmsg("Page %u of relation %u/%u/%u.%u is all-zeros", (errmsg(NEON_TAG "Page %u of relation %u/%u/%u.%u is all-zeros",
blocknum, blocknum,
RelFileInfoFmt(InfoFromSMgrRel(reln)), RelFileInfoFmt(InfoFromSMgrRel(reln)),
forknum))); forknum)));
@@ -1313,7 +1368,7 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, co
else if (PageIsEmptyHeapPage((Page) buffer)) else if (PageIsEmptyHeapPage((Page) buffer))
{ {
ereport(SmgrTrace, ereport(SmgrTrace,
(errmsg("Page %u of relation %u/%u/%u.%u is an empty heap page with no LSN", (errmsg(NEON_TAG "Page %u of relation %u/%u/%u.%u is an empty heap page with no LSN",
blocknum, blocknum,
RelFileInfoFmt(InfoFromSMgrRel(reln)), RelFileInfoFmt(InfoFromSMgrRel(reln)),
forknum))); forknum)));
@@ -1321,7 +1376,7 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, co
else else
{ {
ereport(PANIC, ereport(PANIC,
(errmsg("Page %u of relation %u/%u/%u.%u is evicted with zero LSN", (errmsg(NEON_TAG "Page %u of relation %u/%u/%u.%u is evicted with zero LSN",
blocknum, blocknum,
RelFileInfoFmt(InfoFromSMgrRel(reln)), RelFileInfoFmt(InfoFromSMgrRel(reln)),
forknum))); forknum)));
@@ -1330,7 +1385,7 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, co
else else
{ {
ereport(SmgrTrace, ereport(SmgrTrace,
(errmsg("Page %u of relation %u/%u/%u.%u is already wal logged at lsn=%X/%X", (errmsg(NEON_TAG "Page %u of relation %u/%u/%u.%u is already wal logged at lsn=%X/%X",
blocknum, blocknum,
RelFileInfoFmt(InfoFromSMgrRel(reln)), RelFileInfoFmt(InfoFromSMgrRel(reln)),
forknum, LSN_FORMAT_ARGS(lsn)))); forknum, LSN_FORMAT_ARGS(lsn))));
@@ -1430,7 +1485,7 @@ neon_get_request_lsn(bool *latest, NRelFileInfo rinfo, ForkNumber forknum, Block
lsn = GetLastWrittenLSN(rinfo, forknum, blkno); lsn = GetLastWrittenLSN(rinfo, forknum, blkno);
lsn = nm_adjust_lsn(lsn); lsn = nm_adjust_lsn(lsn);
elog(DEBUG1, "neon_get_request_lsn GetXLogReplayRecPtr %X/%X request lsn 0 ", neon_log(DEBUG1, "neon_get_request_lsn GetXLogReplayRecPtr %X/%X request lsn 0 ",
(uint32) ((lsn) >> 32), (uint32) (lsn)); (uint32) ((lsn) >> 32), (uint32) (lsn));
} }
else else
@@ -1445,7 +1500,7 @@ neon_get_request_lsn(bool *latest, NRelFileInfo rinfo, ForkNumber forknum, Block
*latest = true; *latest = true;
lsn = GetLastWrittenLSN(rinfo, forknum, blkno); lsn = GetLastWrittenLSN(rinfo, forknum, blkno);
Assert(lsn != InvalidXLogRecPtr); Assert(lsn != InvalidXLogRecPtr);
elog(DEBUG1, "neon_get_request_lsn GetLastWrittenLSN lsn %X/%X ", neon_log(DEBUG1, "neon_get_request_lsn GetLastWrittenLSN lsn %X/%X ",
(uint32) ((lsn) >> 32), (uint32) (lsn)); (uint32) ((lsn) >> 32), (uint32) (lsn));
lsn = nm_adjust_lsn(lsn); lsn = nm_adjust_lsn(lsn);
@@ -1465,7 +1520,7 @@ neon_get_request_lsn(bool *latest, NRelFileInfo rinfo, ForkNumber forknum, Block
#endif #endif
if (lsn > flushlsn) if (lsn > flushlsn)
{ {
elog(DEBUG5, "last-written LSN %X/%X is ahead of last flushed LSN %X/%X", neon_log(DEBUG5, "last-written LSN %X/%X is ahead of last flushed LSN %X/%X",
(uint32) (lsn >> 32), (uint32) lsn, (uint32) (lsn >> 32), (uint32) lsn,
(uint32) (flushlsn >> 32), (uint32) flushlsn); (uint32) (flushlsn >> 32), (uint32) flushlsn);
XLogFlush(lsn); XLogFlush(lsn);
@@ -1509,7 +1564,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
return mdexists(reln, forkNum); return mdexists(reln, forkNum);
default: default:
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
} }
if (get_cached_relsize(InfoFromSMgrRel(reln), forkNum, &n_blocks)) if (get_cached_relsize(InfoFromSMgrRel(reln), forkNum, &n_blocks))
@@ -1561,7 +1616,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
case T_NeonErrorResponse: case T_NeonErrorResponse:
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_IO_ERROR), (errcode(ERRCODE_IO_ERROR),
errmsg("could not read relation existence of rel %u/%u/%u.%u from page server at lsn %X/%08X", errmsg(NEON_TAG "could not read relation existence of rel %u/%u/%u.%u from page server at lsn %X/%08X",
RelFileInfoFmt(InfoFromSMgrRel(reln)), RelFileInfoFmt(InfoFromSMgrRel(reln)),
forkNum, forkNum,
(uint32) (request_lsn >> 32), (uint32) request_lsn), (uint32) (request_lsn >> 32), (uint32) request_lsn),
@@ -1570,7 +1625,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
break; break;
default: default:
elog(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag); neon_log(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
} }
pfree(resp); pfree(resp);
return exists; return exists;
@@ -1587,7 +1642,7 @@ neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo)
switch (reln->smgr_relpersistence) switch (reln->smgr_relpersistence)
{ {
case 0: case 0:
elog(ERROR, "cannot call smgrcreate() on rel with unknown persistence"); neon_log(ERROR, "cannot call smgrcreate() on rel with unknown persistence");
case RELPERSISTENCE_PERMANENT: case RELPERSISTENCE_PERMANENT:
break; break;
@@ -1598,10 +1653,10 @@ neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo)
return; return;
default: default:
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
} }
elog(SmgrTrace, "Create relation %u/%u/%u.%u", neon_log(SmgrTrace, "Create relation %u/%u/%u.%u",
RelFileInfoFmt(InfoFromSMgrRel(reln)), RelFileInfoFmt(InfoFromSMgrRel(reln)),
forkNum); forkNum);
@@ -1696,7 +1751,7 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
switch (reln->smgr_relpersistence) switch (reln->smgr_relpersistence)
{ {
case 0: case 0:
elog(ERROR, "cannot call smgrextend() on rel with unknown persistence"); neon_log(ERROR, "cannot call smgrextend() on rel with unknown persistence");
case RELPERSISTENCE_PERMANENT: case RELPERSISTENCE_PERMANENT:
break; break;
@@ -1707,7 +1762,7 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
return; return;
default: default:
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
} }
/* /*
@@ -1726,7 +1781,7 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
if (current_size >= ((uint64) max_cluster_size) * 1024 * 1024) if (current_size >= ((uint64) max_cluster_size) * 1024 * 1024)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_DISK_FULL), (errcode(ERRCODE_DISK_FULL),
errmsg("could not extend file because project size limit (%d MB) has been exceeded", errmsg(NEON_TAG "could not extend file because project size limit (%d MB) has been exceeded",
max_cluster_size), max_cluster_size),
errhint("This limit is defined externally by the project size limit, and internally by neon.max_cluster_size GUC"))); errhint("This limit is defined externally by the project size limit, and internally by neon.max_cluster_size GUC")));
} }
@@ -1745,7 +1800,7 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
set_cached_relsize(InfoFromSMgrRel(reln), forkNum, blkno + 1); set_cached_relsize(InfoFromSMgrRel(reln), forkNum, blkno + 1);
lsn = PageGetLSN((Page) buffer); lsn = PageGetLSN((Page) buffer);
elog(SmgrTrace, "smgrextend called for %u/%u/%u.%u blk %u, page LSN: %X/%08X", neon_log(SmgrTrace, "smgrextend called for %u/%u/%u.%u blk %u, page LSN: %X/%08X",
RelFileInfoFmt(InfoFromSMgrRel(reln)), RelFileInfoFmt(InfoFromSMgrRel(reln)),
forkNum, blkno, forkNum, blkno,
(uint32) (lsn >> 32), (uint32) lsn); (uint32) (lsn >> 32), (uint32) lsn);
@@ -1785,7 +1840,7 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blocknum,
switch (reln->smgr_relpersistence) switch (reln->smgr_relpersistence)
{ {
case 0: case 0:
elog(ERROR, "cannot call smgrextend() on rel with unknown persistence"); neon_log(ERROR, "cannot call smgrextend() on rel with unknown persistence");
case RELPERSISTENCE_PERMANENT: case RELPERSISTENCE_PERMANENT:
break; break;
@@ -1796,7 +1851,7 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blocknum,
return; return;
default: default:
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
} }
if (max_cluster_size > 0 && if (max_cluster_size > 0 &&
@@ -1808,7 +1863,7 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blocknum,
if (current_size >= ((uint64) max_cluster_size) * 1024 * 1024) if (current_size >= ((uint64) max_cluster_size) * 1024 * 1024)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_DISK_FULL), (errcode(ERRCODE_DISK_FULL),
errmsg("could not extend file because cluster size limit (%d MB) has been exceeded", errmsg(NEON_TAG "could not extend file because cluster size limit (%d MB) has been exceeded",
max_cluster_size), max_cluster_size),
errhint("This limit is defined by neon.max_cluster_size GUC"))); errhint("This limit is defined by neon.max_cluster_size GUC")));
} }
@@ -1821,7 +1876,7 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blocknum,
if ((uint64) blocknum + nblocks >= (uint64) InvalidBlockNumber) if ((uint64) blocknum + nblocks >= (uint64) InvalidBlockNumber)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("cannot extend file \"%s\" beyond %u blocks", errmsg(NEON_TAG "cannot extend file \"%s\" beyond %u blocks",
relpath(reln->smgr_rlocator, forkNum), relpath(reln->smgr_rlocator, forkNum),
InvalidBlockNumber))); InvalidBlockNumber)));
@@ -1882,7 +1937,7 @@ neon_open(SMgrRelation reln)
mdopen(reln); mdopen(reln);
/* no work */ /* no work */
elog(SmgrTrace, "[NEON_SMGR] open noop"); neon_log(SmgrTrace, "open noop");
} }
/* /*
@@ -1919,7 +1974,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
return mdprefetch(reln, forknum, blocknum); return mdprefetch(reln, forknum, blocknum);
default: default:
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
} }
if (lfc_cache_contains(InfoFromSMgrRel(reln), forknum, blocknum)) if (lfc_cache_contains(InfoFromSMgrRel(reln), forknum, blocknum))
@@ -1964,11 +2019,11 @@ neon_writeback(SMgrRelation reln, ForkNumber forknum,
return; return;
default: default:
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
} }
/* not implemented */ /* not implemented */
elog(SmgrTrace, "[NEON_SMGR] writeback noop"); neon_log(SmgrTrace, "writeback noop");
#ifdef DEBUG_COMPARE_LOCAL #ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln)) if (IS_LOCAL_REL(reln))
@@ -2098,8 +2153,8 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
case T_NeonErrorResponse: case T_NeonErrorResponse:
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_IO_ERROR), (errcode(ERRCODE_IO_ERROR),
errmsg("could not read block %u in rel %u/%u/%u.%u from page server at lsn %X/%08X", errmsg(NEON_TAG "[shard %d] could not read block %u in rel %u/%u/%u.%u from page server at lsn %X/%08X",
blkno, slot->shard_no, blkno,
RelFileInfoFmt(rinfo), RelFileInfoFmt(rinfo),
forkNum, forkNum,
(uint32) (request_lsn >> 32), (uint32) request_lsn), (uint32) (request_lsn >> 32), (uint32) request_lsn),
@@ -2107,7 +2162,7 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
((NeonErrorResponse *) resp)->message))); ((NeonErrorResponse *) resp)->message)));
break; break;
default: default:
elog(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag); neon_log(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
} }
/* buffer was used, clean up for later reuse */ /* buffer was used, clean up for later reuse */
@@ -2131,7 +2186,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
switch (reln->smgr_relpersistence) switch (reln->smgr_relpersistence)
{ {
case 0: case 0:
elog(ERROR, "cannot call smgrread() on rel with unknown persistence"); neon_log(ERROR, "cannot call smgrread() on rel with unknown persistence");
case RELPERSISTENCE_PERMANENT: case RELPERSISTENCE_PERMANENT:
break; break;
@@ -2142,7 +2197,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
return; return;
default: default:
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
} }
/* Try to read from local file cache */ /* Try to read from local file cache */
@@ -2170,7 +2225,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
{ {
if (!PageIsNew((Page) pageserver_masked)) if (!PageIsNew((Page) pageserver_masked))
{ {
elog(PANIC, "page is new in MD but not in Page Server at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n%s\n", neon_log(PANIC, "page is new in MD but not in Page Server at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n%s\n",
blkno, blkno,
RelFileInfoFmt(InfoFromSMgrRel(reln)), RelFileInfoFmt(InfoFromSMgrRel(reln)),
forkNum, forkNum,
@@ -2180,7 +2235,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
} }
else if (PageIsNew((Page) buffer)) else if (PageIsNew((Page) buffer))
{ {
elog(PANIC, "page is new in Page Server but not in MD at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n%s\n", neon_log(PANIC, "page is new in Page Server but not in MD at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n%s\n",
blkno, blkno,
RelFileInfoFmt(InfoFromSMgrRel(reln)), RelFileInfoFmt(InfoFromSMgrRel(reln)),
forkNum, forkNum,
@@ -2195,7 +2250,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
if (memcmp(mdbuf_masked, pageserver_masked, BLCKSZ) != 0) if (memcmp(mdbuf_masked, pageserver_masked, BLCKSZ) != 0)
{ {
elog(PANIC, "heap buffers differ at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n------ MD ------\n%s\n------ Page Server ------\n%s\n", neon_log(PANIC, "heap buffers differ at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n------ MD ------\n%s\n------ Page Server ------\n%s\n",
blkno, blkno,
RelFileInfoFmt(InfoFromSMgrRel(reln)), RelFileInfoFmt(InfoFromSMgrRel(reln)),
forkNum, forkNum,
@@ -2214,7 +2269,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
if (memcmp(mdbuf_masked, pageserver_masked, BLCKSZ) != 0) if (memcmp(mdbuf_masked, pageserver_masked, BLCKSZ) != 0)
{ {
elog(PANIC, "btree buffers differ at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n------ MD ------\n%s\n------ Page Server ------\n%s\n", neon_log(PANIC, "btree buffers differ at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n------ MD ------\n%s\n------ Page Server ------\n%s\n",
blkno, blkno,
RelFileInfoFmt(InfoFromSMgrRel(reln)), RelFileInfoFmt(InfoFromSMgrRel(reln)),
forkNum, forkNum,
@@ -2294,13 +2349,13 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const vo
return; return;
default: default:
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
} }
neon_wallog_page(reln, forknum, blocknum, buffer, false); neon_wallog_page(reln, forknum, blocknum, buffer, false);
lsn = PageGetLSN((Page) buffer); lsn = PageGetLSN((Page) buffer);
elog(SmgrTrace, "smgrwrite called for %u/%u/%u.%u blk %u, page LSN: %X/%08X", neon_log(SmgrTrace, "smgrwrite called for %u/%u/%u.%u blk %u, page LSN: %X/%08X",
RelFileInfoFmt(InfoFromSMgrRel(reln)), RelFileInfoFmt(InfoFromSMgrRel(reln)),
forknum, blocknum, forknum, blocknum,
(uint32) (lsn >> 32), (uint32) lsn); (uint32) (lsn >> 32), (uint32) lsn);
@@ -2327,7 +2382,7 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
switch (reln->smgr_relpersistence) switch (reln->smgr_relpersistence)
{ {
case 0: case 0:
elog(ERROR, "cannot call smgrnblocks() on rel with unknown persistence"); neon_log(ERROR, "cannot call smgrnblocks() on rel with unknown persistence");
break; break;
case RELPERSISTENCE_PERMANENT: case RELPERSISTENCE_PERMANENT:
@@ -2338,12 +2393,12 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
return mdnblocks(reln, forknum); return mdnblocks(reln, forknum);
default: default:
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
} }
if (get_cached_relsize(InfoFromSMgrRel(reln), forknum, &n_blocks)) if (get_cached_relsize(InfoFromSMgrRel(reln), forknum, &n_blocks))
{ {
elog(SmgrTrace, "cached nblocks for %u/%u/%u.%u: %u blocks", neon_log(SmgrTrace, "cached nblocks for %u/%u/%u.%u: %u blocks",
RelFileInfoFmt(InfoFromSMgrRel(reln)), RelFileInfoFmt(InfoFromSMgrRel(reln)),
forknum, n_blocks); forknum, n_blocks);
return n_blocks; return n_blocks;
@@ -2371,7 +2426,7 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
case T_NeonErrorResponse: case T_NeonErrorResponse:
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_IO_ERROR), (errcode(ERRCODE_IO_ERROR),
errmsg("could not read relation size of rel %u/%u/%u.%u from page server at lsn %X/%08X", errmsg(NEON_TAG "could not read relation size of rel %u/%u/%u.%u from page server at lsn %X/%08X",
RelFileInfoFmt(InfoFromSMgrRel(reln)), RelFileInfoFmt(InfoFromSMgrRel(reln)),
forknum, forknum,
(uint32) (request_lsn >> 32), (uint32) request_lsn), (uint32) (request_lsn >> 32), (uint32) request_lsn),
@@ -2380,11 +2435,11 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
break; break;
default: default:
elog(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag); neon_log(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
} }
update_cached_relsize(InfoFromSMgrRel(reln), forknum, n_blocks); update_cached_relsize(InfoFromSMgrRel(reln), forknum, n_blocks);
elog(SmgrTrace, "neon_nblocks: rel %u/%u/%u fork %u (request LSN %X/%08X): %u blocks", neon_log(SmgrTrace, "neon_nblocks: rel %u/%u/%u fork %u (request LSN %X/%08X): %u blocks",
RelFileInfoFmt(InfoFromSMgrRel(reln)), RelFileInfoFmt(InfoFromSMgrRel(reln)),
forknum, forknum,
(uint32) (request_lsn >> 32), (uint32) request_lsn, (uint32) (request_lsn >> 32), (uint32) request_lsn,
@@ -2427,7 +2482,7 @@ neon_dbsize(Oid dbNode)
case T_NeonErrorResponse: case T_NeonErrorResponse:
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_IO_ERROR), (errcode(ERRCODE_IO_ERROR),
errmsg("could not read db size of db %u from page server at lsn %X/%08X", errmsg(NEON_TAG "could not read db size of db %u from page server at lsn %X/%08X",
dbNode, dbNode,
(uint32) (request_lsn >> 32), (uint32) request_lsn), (uint32) (request_lsn >> 32), (uint32) request_lsn),
errdetail("page server returned error: %s", errdetail("page server returned error: %s",
@@ -2435,10 +2490,10 @@ neon_dbsize(Oid dbNode)
break; break;
default: default:
elog(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag); neon_log(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
} }
elog(SmgrTrace, "neon_dbsize: db %u (request LSN %X/%08X): %ld bytes", neon_log(SmgrTrace, "neon_dbsize: db %u (request LSN %X/%08X): %ld bytes",
dbNode, dbNode,
(uint32) (request_lsn >> 32), (uint32) request_lsn, (uint32) (request_lsn >> 32), (uint32) request_lsn,
db_size); db_size);
@@ -2458,7 +2513,7 @@ neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks)
switch (reln->smgr_relpersistence) switch (reln->smgr_relpersistence)
{ {
case 0: case 0:
elog(ERROR, "cannot call smgrtruncate() on rel with unknown persistence"); neon_log(ERROR, "cannot call smgrtruncate() on rel with unknown persistence");
break; break;
case RELPERSISTENCE_PERMANENT: case RELPERSISTENCE_PERMANENT:
@@ -2470,7 +2525,7 @@ neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks)
return; return;
default: default:
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
} }
set_cached_relsize(InfoFromSMgrRel(reln), forknum, nblocks); set_cached_relsize(InfoFromSMgrRel(reln), forknum, nblocks);
@@ -2526,7 +2581,7 @@ neon_immedsync(SMgrRelation reln, ForkNumber forknum)
switch (reln->smgr_relpersistence) switch (reln->smgr_relpersistence)
{ {
case 0: case 0:
elog(ERROR, "cannot call smgrimmedsync() on rel with unknown persistence"); neon_log(ERROR, "cannot call smgrimmedsync() on rel with unknown persistence");
break; break;
case RELPERSISTENCE_PERMANENT: case RELPERSISTENCE_PERMANENT:
@@ -2538,10 +2593,10 @@ neon_immedsync(SMgrRelation reln, ForkNumber forknum)
return; return;
default: default:
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
} }
elog(SmgrTrace, "[NEON_SMGR] immedsync noop"); neon_log(SmgrTrace, "[NEON_SMGR] immedsync noop");
#ifdef DEBUG_COMPARE_LOCAL #ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln)) if (IS_LOCAL_REL(reln))
@@ -2566,17 +2621,17 @@ neon_start_unlogged_build(SMgrRelation reln)
* progress at a time. That's enough for the current usage. * progress at a time. That's enough for the current usage.
*/ */
if (unlogged_build_phase != UNLOGGED_BUILD_NOT_IN_PROGRESS) if (unlogged_build_phase != UNLOGGED_BUILD_NOT_IN_PROGRESS)
elog(ERROR, "unlogged relation build is already in progress"); neon_log(ERROR, "unlogged relation build is already in progress");
Assert(unlogged_build_rel == NULL); Assert(unlogged_build_rel == NULL);
ereport(SmgrTrace, ereport(SmgrTrace,
(errmsg("starting unlogged build of relation %u/%u/%u", (errmsg(NEON_TAG "starting unlogged build of relation %u/%u/%u",
RelFileInfoFmt(InfoFromSMgrRel(reln))))); RelFileInfoFmt(InfoFromSMgrRel(reln)))));
switch (reln->smgr_relpersistence) switch (reln->smgr_relpersistence)
{ {
case 0: case 0:
elog(ERROR, "cannot call smgr_start_unlogged_build() on rel with unknown persistence"); neon_log(ERROR, "cannot call smgr_start_unlogged_build() on rel with unknown persistence");
break; break;
case RELPERSISTENCE_PERMANENT: case RELPERSISTENCE_PERMANENT:
@@ -2589,11 +2644,11 @@ neon_start_unlogged_build(SMgrRelation reln)
return; return;
default: default:
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
} }
if (smgrnblocks(reln, MAIN_FORKNUM) != 0) if (smgrnblocks(reln, MAIN_FORKNUM) != 0)
elog(ERROR, "cannot perform unlogged index build, index is not empty "); neon_log(ERROR, "cannot perform unlogged index build, index is not empty ");
unlogged_build_rel = reln; unlogged_build_rel = reln;
unlogged_build_phase = UNLOGGED_BUILD_PHASE_1; unlogged_build_phase = UNLOGGED_BUILD_PHASE_1;
@@ -2620,7 +2675,7 @@ neon_finish_unlogged_build_phase_1(SMgrRelation reln)
Assert(unlogged_build_rel == reln); Assert(unlogged_build_rel == reln);
ereport(SmgrTrace, ereport(SmgrTrace,
(errmsg("finishing phase 1 of unlogged build of relation %u/%u/%u", (errmsg(NEON_TAG "finishing phase 1 of unlogged build of relation %u/%u/%u",
RelFileInfoFmt(InfoFromSMgrRel(reln))))); RelFileInfoFmt(InfoFromSMgrRel(reln)))));
if (unlogged_build_phase == UNLOGGED_BUILD_NOT_PERMANENT) if (unlogged_build_phase == UNLOGGED_BUILD_NOT_PERMANENT)
@@ -2649,7 +2704,7 @@ neon_end_unlogged_build(SMgrRelation reln)
Assert(unlogged_build_rel == reln); Assert(unlogged_build_rel == reln);
ereport(SmgrTrace, ereport(SmgrTrace,
(errmsg("ending unlogged build of relation %u/%u/%u", (errmsg(NEON_TAG "ending unlogged build of relation %u/%u/%u",
RelFileInfoFmt(InfoFromNInfoB(rinfob))))); RelFileInfoFmt(InfoFromNInfoB(rinfob)))));
if (unlogged_build_phase != UNLOGGED_BUILD_NOT_PERMANENT) if (unlogged_build_phase != UNLOGGED_BUILD_NOT_PERMANENT)
@@ -2664,7 +2719,7 @@ neon_end_unlogged_build(SMgrRelation reln)
rinfob = InfoBFromSMgrRel(reln); rinfob = InfoBFromSMgrRel(reln);
for (int forknum = 0; forknum <= MAX_FORKNUM; forknum++) for (int forknum = 0; forknum <= MAX_FORKNUM; forknum++)
{ {
elog(SmgrTrace, "forgetting cached relsize for %u/%u/%u.%u", neon_log(SmgrTrace, "forgetting cached relsize for %u/%u/%u.%u",
RelFileInfoFmt(InfoFromNInfoB(rinfob)), RelFileInfoFmt(InfoFromNInfoB(rinfob)),
forknum); forknum);
@@ -2707,7 +2762,7 @@ AtEOXact_neon(XactEvent event, void *arg)
unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS; unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR), (errcode(ERRCODE_INTERNAL_ERROR),
(errmsg("unlogged index build was not properly finished")))); (errmsg(NEON_TAG "unlogged index build was not properly finished"))));
} }
break; break;
} }
@@ -2806,14 +2861,14 @@ neon_extend_rel_size(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
set_cached_relsize(rinfo, forknum, relsize); set_cached_relsize(rinfo, forknum, relsize);
SetLastWrittenLSNForRelation(end_recptr, rinfo, forknum); SetLastWrittenLSNForRelation(end_recptr, rinfo, forknum);
elog(SmgrTrace, "Set length to %d", relsize); neon_log(SmgrTrace, "Set length to %d", relsize);
} }
} }
#define FSM_TREE_DEPTH ((SlotsPerFSMPage >= 1626) ? 3 : 4) #define FSM_TREE_DEPTH ((SlotsPerFSMPage >= 1626) ? 3 : 4)
/* /*
* TODO: May be it is better to make correspondent fgunctio from freespace.c public? * TODO: May be it is better to make correspondent function from freespace.c public?
*/ */
static BlockNumber static BlockNumber
get_fsm_physical_block(BlockNumber heapblk) get_fsm_physical_block(BlockNumber heapblk)
@@ -2894,7 +2949,7 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
#if PG_VERSION_NUM < 150000 #if PG_VERSION_NUM < 150000
if (!XLogRecGetBlockTag(record, block_id, &rinfo, &forknum, &blkno)) if (!XLogRecGetBlockTag(record, block_id, &rinfo, &forknum, &blkno))
elog(PANIC, "failed to locate backup block with ID %d", block_id); neon_log(PANIC, "failed to locate backup block with ID %d", block_id);
#else #else
XLogRecGetBlockTag(record, block_id, &rinfo, &forknum, &blkno); XLogRecGetBlockTag(record, block_id, &rinfo, &forknum, &blkno);
#endif #endif

View File

@@ -1,5 +1,5 @@
{ {
"postgres-v16": "863b71572bc441581efb3bbee2ad18af037be1bb", "postgres-v16": "de8242c400f7870084861ac5796e0b5088b1898d",
"postgres-v15": "24333abb81a9ecae4541019478f0bf7d0b289df7", "postgres-v15": "a2dc225ddfc8cae1849aa2316f435c58f0333d8c",
"postgres-v14": "0bb356aa0cd1582112926fbcf0b5370222c2db6d" "postgres-v14": "03358bb0b5e0d33c238710139e768db9e75cfcc8"
} }