diff --git a/pgxn/neon/control_plane_connector.c b/pgxn/neon/control_plane_connector.c index 2546e6de5e..2e7da671f9 100644 --- a/pgxn/neon/control_plane_connector.c +++ b/pgxn/neon/control_plane_connector.c @@ -41,7 +41,7 @@ static char *ConsoleURL = NULL; static bool ForwardDDL = true; /* Curl structures for sending the HTTP requests */ -static CURL * CurlHandle; +static CURL *CurlHandle; static struct curl_slist *ContentHeader = NULL; /* @@ -54,7 +54,7 @@ typedef enum { Op_Set, /* An upsert: Either a creation or an alter */ Op_Delete, -} OpType; +} OpType; typedef struct { @@ -62,7 +62,7 @@ typedef struct Oid owner; char old_name[NAMEDATALEN]; OpType type; -} DbEntry; +} DbEntry; typedef struct { @@ -70,7 +70,7 @@ typedef struct char old_name[NAMEDATALEN]; const char *password; OpType type; -} RoleEntry; +} RoleEntry; /* * We keep one of these for each subtransaction in a stack. When a subtransaction @@ -82,10 +82,10 @@ typedef struct DdlHashTable struct DdlHashTable *prev_table; HTAB *db_table; HTAB *role_table; -} DdlHashTable; +} DdlHashTable; static DdlHashTable RootTable; -static DdlHashTable * CurrentDdlTable = &RootTable; +static DdlHashTable *CurrentDdlTable = &RootTable; static void PushKeyValue(JsonbParseState **state, char *key, char *value) @@ -199,7 +199,7 @@ typedef struct { char str[ERROR_SIZE]; size_t size; -} ErrorString; +} ErrorString; static size_t ErrorWriteCallback(char *ptr, size_t size, size_t nmemb, void *userdata) @@ -478,7 +478,7 @@ NeonXactCallback(XactEvent event, void *arg) static bool RoleIsNeonSuperuser(const char *role_name) { - return strcmp(role_name, "neon_superuser") == 0; + return strcmp(role_name, "neon_superuser") == 0; } static void @@ -509,6 +509,7 @@ HandleCreateDb(CreatedbStmt *stmt) if (downer && downer->arg) { const char *owner_name = defGetString(downer); + if (RoleIsNeonSuperuser(owner_name)) elog(ERROR, "can't create a database with owner neon_superuser"); entry->owner = get_role_oid(owner_name, false); @@ -536,6 +537,7 @@ HandleAlterOwner(AlterOwnerStmt *stmt) if (!found) memset(entry->old_name, 0, sizeof(entry->old_name)); const char *new_owner = get_rolespec_name(stmt->newowner); + if (RoleIsNeonSuperuser(new_owner)) elog(ERROR, "can't alter owner to neon_superuser"); entry->owner = get_role_oid(new_owner, false); @@ -633,6 +635,7 @@ HandleAlterRole(AlterRoleStmt *stmt) DefElem *dpass = NULL; ListCell *option; const char *role_name = stmt->role->rolename; + if (RoleIsNeonSuperuser(role_name)) elog(ERROR, "can't ALTER neon_superuser"); diff --git a/pgxn/neon/extension_server.c b/pgxn/neon/extension_server.c index 6053425de0..597eed8db5 100644 --- a/pgxn/neon/extension_server.c +++ b/pgxn/neon/extension_server.c @@ -25,79 +25,80 @@ #include -static int extension_server_port = 0; +static int extension_server_port = 0; static download_extension_file_hook_type prev_download_extension_file_hook = NULL; -// to download all SQL (and data) files for an extension: -// curl -X POST http://localhost:8080/extension_server/postgis -// it covers two possible extension files layouts: -// 1. extension_name--version--platform.sql -// 2. extension_name/extension_name--version.sql -// extension_name/extra_files.csv -// -// to download specific library file: -// curl -X POST http://localhost:8080/extension_server/postgis-3.so?is_library=true +/* to download all SQL (and data) files for an extension: */ +/* curl -X POST http://localhost:8080/extension_server/postgis */ +/* it covers two possible extension files layouts: */ +/* 1. extension_name--version--platform.sql */ +/* 2. extension_name/extension_name--version.sql */ +/* extension_name/extra_files.csv */ +/* */ +/* to download specific library file: */ +/* curl -X POST http://localhost:8080/extension_server/postgis-3.so?is_library=true */ static bool neon_download_extension_file_http(const char *filename, bool is_library) { - CURL *curl; - CURLcode res; - char *compute_ctl_url; - char *postdata; - bool ret = false; + CURL *curl; + CURLcode res; + char *compute_ctl_url; + char *postdata; + bool ret = false; - if ((curl = curl_easy_init()) == NULL) - { - elog(ERROR, "Failed to initialize curl handle"); - } + if ((curl = curl_easy_init()) == NULL) + { + elog(ERROR, "Failed to initialize curl handle"); + } - compute_ctl_url = psprintf("http://localhost:%d/extension_server/%s%s", - extension_server_port, filename, is_library ? "?is_library=true" : ""); + compute_ctl_url = psprintf("http://localhost:%d/extension_server/%s%s", + extension_server_port, filename, is_library ? "?is_library=true" : ""); - elog(LOG, "Sending request to compute_ctl: %s", compute_ctl_url); + elog(LOG, "Sending request to compute_ctl: %s", compute_ctl_url); - curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "POST"); - curl_easy_setopt(curl, CURLOPT_URL, compute_ctl_url); - curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L /* seconds */); + curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "POST"); + curl_easy_setopt(curl, CURLOPT_URL, compute_ctl_url); + curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L /* seconds */ ); - if (curl) - { - /* Perform the request, res will get the return code */ - res = curl_easy_perform(curl); - /* Check for errors */ - if (res == CURLE_OK) - { - ret = true; - } - else - { - // Don't error here because postgres will try to find the file - // and will fail with some proper error message if it's not found. - elog(WARNING, "neon_download_extension_file_http failed: %s\n", curl_easy_strerror(res)); - } + if (curl) + { + /* Perform the request, res will get the return code */ + res = curl_easy_perform(curl); + /* Check for errors */ + if (res == CURLE_OK) + { + ret = true; + } + else + { + /* Don't error here because postgres will try to find the file */ + /* and will fail with some proper error message if it's not found. */ + elog(WARNING, "neon_download_extension_file_http failed: %s\n", curl_easy_strerror(res)); + } - /* always cleanup */ - curl_easy_cleanup(curl); - } + /* always cleanup */ + curl_easy_cleanup(curl); + } - return ret; + return ret; } -void pg_init_extension_server() +void +pg_init_extension_server() { - // Port to connect to compute_ctl on localhost - // to request extension files. - DefineCustomIntVariable("neon.extension_server_port", - "connection string to the compute_ctl", - NULL, - &extension_server_port, - 0, 0, INT_MAX, - PGC_POSTMASTER, - 0, /* no flags required */ - NULL, NULL, NULL); + /* Port to connect to compute_ctl on localhost */ + /* to request extension files. */ + DefineCustomIntVariable("neon.extension_server_port", + "connection string to the compute_ctl", + NULL, + &extension_server_port, + 0, 0, INT_MAX, + PGC_POSTMASTER, + 0, /* no flags required */ + NULL, NULL, NULL); - // set download_extension_file_hook - prev_download_extension_file_hook = download_extension_file_hook; - download_extension_file_hook = neon_download_extension_file_http; + /* set download_extension_file_hook */ + prev_download_extension_file_hook = download_extension_file_hook; + download_extension_file_hook = neon_download_extension_file_http; } diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index e70f0163c0..991b553b10 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -67,32 +67,34 @@ typedef struct FileCacheEntry { BufferTag key; - uint32 hash; + uint32 hash; uint32 offset; uint32 access_count; - uint32 bitmap[BLOCKS_PER_CHUNK/32]; - dlist_node lru_node; /* LRU list node */ + uint32 bitmap[BLOCKS_PER_CHUNK / 32]; + dlist_node lru_node; /* LRU list node */ } FileCacheEntry; typedef struct FileCacheControl { - uint64 generation; /* generation is needed to handle correct hash reenabling */ - uint32 size; /* size of cache file in chunks */ - uint32 used; /* number of used chunks */ - uint32 limit; /* shared copy of lfc_size_limit */ - uint64 hits; - uint64 misses; - uint64 writes; - dlist_head lru; /* double linked list for LRU replacement algorithm */ + uint64 generation; /* generation is needed to handle correct hash + * reenabling */ + uint32 size; /* size of cache file in chunks */ + uint32 used; /* number of used chunks */ + uint32 limit; /* shared copy of lfc_size_limit */ + uint64 hits; + uint64 misses; + uint64 writes; + dlist_head lru; /* double linked list for LRU replacement + * algorithm */ } FileCacheControl; -static HTAB* lfc_hash; -static int lfc_desc = 0; +static HTAB *lfc_hash; +static int lfc_desc = 0; static LWLockId lfc_lock; -static int lfc_max_size; -static int lfc_size_limit; -static char* lfc_path; -static FileCacheControl* lfc_ctl; +static int lfc_max_size; +static int lfc_size_limit; +static char *lfc_path; +static FileCacheControl *lfc_ctl; static shmem_startup_hook_type prev_shmem_startup_hook; #if PG_VERSION_NUM>=150000 static shmem_request_hook_type prev_shmem_request_hook; @@ -100,7 +102,7 @@ static shmem_request_hook_type prev_shmem_request_hook; #define LFC_ENABLED() (lfc_ctl->limit != 0) -void PGDLLEXPORT FileCacheMonitorMain(Datum main_arg); +void PGDLLEXPORT FileCacheMonitorMain(Datum main_arg); /* * Local file cache is optional and Neon can work without it. @@ -109,9 +111,10 @@ void PGDLLEXPORT FileCacheMonitorMain(Datum main_arg); * All cache content should be invalidated to avoid reading of stale or corrupted data */ static void -lfc_disable(char const* op) +lfc_disable(char const *op) { - int fd; + int fd; + elog(WARNING, "Failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path); /* Invalidate hash */ @@ -120,7 +123,7 @@ lfc_disable(char const* op) if (LFC_ENABLED()) { HASH_SEQ_STATUS status; - FileCacheEntry* entry; + FileCacheEntry *entry; hash_seq_init(&status, lfc_hash); while ((entry = hash_seq_search(&status)) != NULL) @@ -135,16 +138,24 @@ lfc_disable(char const* op) if (lfc_desc > 0) { - /* If the reason of error is ENOSPC, then truncation of file may help to reclaim some space */ - int rc = ftruncate(lfc_desc, 0); + /* + * If the reason of error is ENOSPC, then truncation of file may + * help to reclaim some space + */ + int rc = ftruncate(lfc_desc, 0); + if (rc < 0) elog(WARNING, "Failed to truncate local file cache %s: %m", lfc_path); } } - /* We need to use unlink to to avoid races in LFC write, because it is not protectedby */ + + /* + * We need to use unlink to to avoid races in LFC write, because it is not + * protectedby + */ unlink(lfc_path); - fd = BasicOpenFile(lfc_path, O_RDWR|O_CREAT|O_TRUNC); + fd = BasicOpenFile(lfc_path, O_RDWR | O_CREAT | O_TRUNC); if (fd < 0) elog(WARNING, "Failed to recreate local file cache %s: %m", lfc_path); else @@ -170,13 +181,15 @@ lfc_maybe_disabled(void) static bool lfc_ensure_opened(void) { - bool enabled = !lfc_maybe_disabled(); + bool enabled = !lfc_maybe_disabled(); + /* Open cache file if not done yet */ if (lfc_desc <= 0 && enabled) { lfc_desc = BasicOpenFile(lfc_path, O_RDWR); - if (lfc_desc < 0) { + if (lfc_desc < 0) + { lfc_disable("open"); return false; } @@ -187,7 +200,7 @@ lfc_ensure_opened(void) static void lfc_shmem_startup(void) { - bool found; + bool found; static HASHCTL info; if (prev_shmem_startup_hook) @@ -197,17 +210,22 @@ lfc_shmem_startup(void) LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); - lfc_ctl = (FileCacheControl*)ShmemInitStruct("lfc", sizeof(FileCacheControl), &found); + lfc_ctl = (FileCacheControl *) ShmemInitStruct("lfc", sizeof(FileCacheControl), &found); if (!found) { - int fd; - uint32 lfc_size = SIZE_MB_TO_CHUNKS(lfc_max_size); - lfc_lock = (LWLockId)GetNamedLWLockTranche("lfc_lock"); + int fd; + uint32 lfc_size = SIZE_MB_TO_CHUNKS(lfc_max_size); + + lfc_lock = (LWLockId) GetNamedLWLockTranche("lfc_lock"); info.keysize = sizeof(BufferTag); info.entrysize = sizeof(FileCacheEntry); lfc_hash = ShmemInitHash("lfc_hash", - /* lfc_size+1 because we add new element to hash table before eviction of victim */ - lfc_size+1, lfc_size+1, + + /* + * lfc_size+1 because we add new element to hash table before eviction + * of victim + */ + lfc_size + 1, lfc_size + 1, &info, HASH_ELEM | HASH_BLOBS); lfc_ctl->generation = 0; @@ -219,7 +237,7 @@ lfc_shmem_startup(void) dlist_init(&lfc_ctl->lru); /* Recreate file cache on restart */ - fd = BasicOpenFile(lfc_path, O_RDWR|O_CREAT|O_TRUNC); + fd = BasicOpenFile(lfc_path, O_RDWR | O_CREAT | O_TRUNC); if (fd < 0) { elog(WARNING, "Failed to create local file cache %s: %m", lfc_path); @@ -242,7 +260,7 @@ lfc_shmem_request(void) prev_shmem_request_hook(); #endif - RequestAddinShmemSpace(sizeof(FileCacheControl) + hash_estimate_size(SIZE_MB_TO_CHUNKS(lfc_max_size)+1, sizeof(FileCacheEntry))); + RequestAddinShmemSpace(sizeof(FileCacheControl) + hash_estimate_size(SIZE_MB_TO_CHUNKS(lfc_max_size) + 1, sizeof(FileCacheEntry))); RequestNamedLWLockTranche("lfc_lock", 1); } @@ -250,9 +268,11 @@ static bool is_normal_backend(void) { /* - * Stats collector detach shared memory, so we should not try to access shared memory here. - * Parallel workers first assign default value (0), so not perform truncation in parallel workers. - * The Postmaster can handle SIGHUP and it has access to shared memory (UsedShmemSegAddr != NULL), but has no PGPROC. + * Stats collector detach shared memory, so we should not try to access + * shared memory here. Parallel workers first assign default value (0), so + * not perform truncation in parallel workers. The Postmaster can handle + * SIGHUP and it has access to shared memory (UsedShmemSegAddr != NULL), + * but has no PGPROC. */ return lfc_ctl && MyProc && UsedShmemSegAddr && !IsParallelWorker(); } @@ -271,7 +291,7 @@ lfc_check_limit_hook(int *newval, void **extra, GucSource source) static void lfc_change_limit_hook(int newval, void *extra) { - uint32 new_size = SIZE_MB_TO_CHUNKS(newval); + uint32 new_size = SIZE_MB_TO_CHUNKS(newval); if (!is_normal_backend()) return; @@ -283,11 +303,15 @@ lfc_change_limit_hook(int newval, void *extra) while (new_size < lfc_ctl->used && !dlist_is_empty(&lfc_ctl->lru)) { - /* Shrink cache by throwing away least recently accessed chunks and returning their space to file system */ - FileCacheEntry* victim = dlist_container(FileCacheEntry, lru_node, dlist_pop_head_node(&lfc_ctl->lru)); + /* + * Shrink cache by throwing away least recently accessed chunks and + * returning their space to file system + */ + FileCacheEntry *victim = dlist_container(FileCacheEntry, lru_node, dlist_pop_head_node(&lfc_ctl->lru)); + Assert(victim->access_count == 0); #ifdef FALLOC_FL_PUNCH_HOLE - if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE|FALLOC_FL_KEEP_SIZE, (off_t)victim->offset*BLOCKS_PER_CHUNK*BLCKSZ, BLOCKS_PER_CHUNK*BLCKSZ) < 0) + if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, (off_t) victim->offset * BLOCKS_PER_CHUNK * BLCKSZ, BLOCKS_PER_CHUNK * BLCKSZ) < 0) elog(LOG, "Failed to punch hole in file: %m"); #endif hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL); @@ -314,7 +338,7 @@ lfc_init(void) "Maximal size of Neon local file cache", NULL, &lfc_max_size, - 0, /* disabled by default */ + 0, /* disabled by default */ 0, INT_MAX, PGC_POSTMASTER, @@ -327,7 +351,7 @@ lfc_init(void) "Current limit for size of Neon local file cache", NULL, &lfc_size_limit, - 0, /* disabled by default */ + 0, /* disabled by default */ 0, INT_MAX, PGC_SIGHUP, @@ -367,18 +391,18 @@ lfc_init(void) bool lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno) { - BufferTag tag; - FileCacheEntry* entry; - int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1); - bool found = false; - uint32 hash; + BufferTag tag; + FileCacheEntry *entry; + int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1); + bool found = false; + uint32 hash; - if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */ + if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */ return false; CopyNRelFileInfoToBufTag(tag, rinfo); tag.forkNum = forkNum; - tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK-1); + tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1); hash = get_hash_value(lfc_hash, &tag); LWLockAcquire(lfc_lock, LW_SHARED); @@ -397,13 +421,13 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno) void lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno) { - BufferTag tag; - FileCacheEntry* entry; - bool found; - int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1); - uint32 hash; + BufferTag tag; + FileCacheEntry *entry; + bool found; + int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1); + uint32 hash; - if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */ + if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */ return; CopyNRelFileInfoToBufTag(tag, rinfo); @@ -438,9 +462,10 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno) */ if (entry->bitmap[chunk_offs >> 5] == 0) { - bool has_remaining_pages; + bool has_remaining_pages; - for (int i = 0; i < (BLOCKS_PER_CHUNK / 32); i++) { + for (int i = 0; i < (BLOCKS_PER_CHUNK / 32); i++) + { if (entry->bitmap[i] != 0) { has_remaining_pages = true; @@ -449,8 +474,8 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno) } /* - * Put the entry at the position that is first to be reclaimed when - * we have no cached pages remaining in the chunk + * Put the entry at the position that is first to be reclaimed when we + * have no cached pages remaining in the chunk */ if (!has_remaining_pages) { @@ -476,16 +501,16 @@ bool lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, char *buffer) { - BufferTag tag; - FileCacheEntry* entry; - ssize_t rc; - int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1); - bool result = true; - uint32 hash; - uint64 generation; - uint32 entry_offset; + BufferTag tag; + FileCacheEntry *entry; + ssize_t rc; + int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1); + bool result = true; + uint32 hash; + uint64 generation; + uint32 entry_offset; - if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */ + if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */ return false; if (!lfc_ensure_opened()) @@ -493,7 +518,7 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, CopyNRelFileInfoToBufTag(tag, rinfo); tag.forkNum = forkNum; - tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK-1); + tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1); hash = get_hash_value(lfc_hash, &tag); LWLockAcquire(lfc_lock, LW_EXCLUSIVE); @@ -520,7 +545,7 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, LWLockRelease(lfc_lock); - rc = pread(lfc_desc, buffer, BLCKSZ, ((off_t)entry_offset*BLOCKS_PER_CHUNK + chunk_offs)*BLCKSZ); + rc = pread(lfc_desc, buffer, BLCKSZ, ((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ); if (rc != BLCKSZ) { lfc_disable("read"); @@ -551,30 +576,30 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, * If cache is full then evict some other page. */ void -lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, + lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, #if PG_MAJORVERSION_NUM < 16 - char *buffer) + char *buffer) #else - const void *buffer) + const void *buffer) #endif { - BufferTag tag; - FileCacheEntry* entry; - ssize_t rc; - bool found; - int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1); - uint32 hash; - uint64 generation; - uint32 entry_offset; + BufferTag tag; + FileCacheEntry *entry; + ssize_t rc; + bool found; + int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1); + uint32 hash; + uint64 generation; + uint32 entry_offset; - if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */ + if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */ return; if (!lfc_ensure_opened()) return; tag.forkNum = forkNum; - tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK-1); + tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1); CopyNRelFileInfoToBufTag(tag, rinfo); hash = get_hash_value(lfc_hash, &tag); @@ -590,24 +615,30 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, if (found) { - /* Unlink entry from LRU list to pin it for the duration of IO operation */ + /* + * Unlink entry from LRU list to pin it for the duration of IO + * operation + */ if (entry->access_count++ == 0) dlist_delete(&entry->lru_node); } else { /* - * We have two choices if all cache pages are pinned (i.e. used in IO operations): - * 1. Wait until some of this operation is completed and pages is unpinned - * 2. Allocate one more chunk, so that specified cache size is more recommendation than hard limit. - * As far as probability of such event (that all pages are pinned) is considered to be very very small: - * there are should be very large number of concurrent IO operations and them are limited by max_connections, + * We have two choices if all cache pages are pinned (i.e. used in IO + * operations): 1. Wait until some of this operation is completed and + * pages is unpinned 2. Allocate one more chunk, so that specified + * cache size is more recommendation than hard limit. As far as + * probability of such event (that all pages are pinned) is considered + * to be very very small: there are should be very large number of + * concurrent IO operations and them are limited by max_connections, * we prefer not to complicate code and use second approach. */ if (lfc_ctl->used >= lfc_ctl->limit && !dlist_is_empty(&lfc_ctl->lru)) { /* Cache overflow: evict least recently used chunk */ - FileCacheEntry* victim = dlist_container(FileCacheEntry, lru_node, dlist_pop_head_node(&lfc_ctl->lru)); + FileCacheEntry *victim = dlist_container(FileCacheEntry, lru_node, dlist_pop_head_node(&lfc_ctl->lru)); + Assert(victim->access_count == 0); entry->offset = victim->offset; /* grab victim's chunk */ hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL); @@ -616,7 +647,8 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, else { lfc_ctl->used += 1; - entry->offset = lfc_ctl->size++; /* allocate new chunk at end of file */ + entry->offset = lfc_ctl->size++; /* allocate new chunk at end + * of file */ } entry->access_count = 1; entry->hash = hash; @@ -628,7 +660,7 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, lfc_ctl->writes += 1; LWLockRelease(lfc_lock); - rc = pwrite(lfc_desc, buffer, BLCKSZ, ((off_t)entry_offset*BLOCKS_PER_CHUNK + chunk_offs)*BLCKSZ); + rc = pwrite(lfc_desc, buffer, BLCKSZ, ((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ); if (rc != BLCKSZ) { lfc_disable("write"); @@ -665,13 +697,13 @@ Datum neon_get_lfc_stats(PG_FUNCTION_ARGS) { FuncCallContext *funcctx; - NeonGetStatsCtx* fctx; + NeonGetStatsCtx *fctx; MemoryContext oldcontext; TupleDesc tupledesc; Datum result; HeapTuple tuple; - char const* key; - uint64 value; + char const *key; + uint64 value; Datum values[NUM_NEON_GET_STATS_COLS]; bool nulls[NUM_NEON_GET_STATS_COLS]; @@ -683,7 +715,7 @@ neon_get_lfc_stats(PG_FUNCTION_ARGS) oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); /* Create a user function context for cross-call persistence */ - fctx = (NeonGetStatsCtx*) palloc(sizeof(NeonGetStatsCtx)); + fctx = (NeonGetStatsCtx *) palloc(sizeof(NeonGetStatsCtx)); /* Construct a tuple descriptor for the result rows. */ tupledesc = CreateTemplateTupleDesc(NUM_NEON_GET_STATS_COLS); @@ -704,7 +736,7 @@ neon_get_lfc_stats(PG_FUNCTION_ARGS) funcctx = SRF_PERCALL_SETUP(); /* Get the saved state */ - fctx = (NeonGetStatsCtx*) funcctx->user_fctx; + fctx = (NeonGetStatsCtx *) funcctx->user_fctx; switch (funcctx->call_cntr) { @@ -792,9 +824,9 @@ local_cache_pages(PG_FUNCTION_ARGS) if (SRF_IS_FIRSTCALL()) { - HASH_SEQ_STATUS status; - FileCacheEntry* entry; - uint32 n_pages = 0; + HASH_SEQ_STATUS status; + FileCacheEntry *entry; + uint32 n_pages = 0; funcctx = SRF_FIRSTCALL_INIT(); @@ -851,7 +883,7 @@ local_cache_pages(PG_FUNCTION_ARGS) hash_seq_init(&status, lfc_hash); while ((entry = hash_seq_search(&status)) != NULL) { - for (int i = 0; i < BLOCKS_PER_CHUNK/32; i++) + for (int i = 0; i < BLOCKS_PER_CHUNK / 32; i++) n_pages += pg_popcount32(entry->bitmap[i]); } } @@ -870,10 +902,11 @@ local_cache_pages(PG_FUNCTION_ARGS) if (n_pages != 0) { /* - * Scan through all the cache entries, saving the relevant fields in the - * fctx->record structure. + * Scan through all the cache entries, saving the relevant fields + * in the fctx->record structure. */ - uint32 n = 0; + uint32 n = 0; + hash_seq_init(&status, lfc_hash); while ((entry = hash_seq_search(&status)) != NULL) { @@ -881,7 +914,7 @@ local_cache_pages(PG_FUNCTION_ARGS) { if (entry->bitmap[i >> 5] & (1 << (i & 31))) { - fctx->record[n].pageoffs = entry->offset*BLOCKS_PER_CHUNK + i; + fctx->record[n].pageoffs = entry->offset * BLOCKS_PER_CHUNK + i; fctx->record[n].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key)); fctx->record[n].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key)); fctx->record[n].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key)); diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index 8eb9ebb915..16406ce8a3 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -69,9 +69,9 @@ int max_reconnect_attempts = 60; typedef struct { - LWLockId lock; - pg_atomic_uint64 update_counter; - char pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE]; + LWLockId lock; + pg_atomic_uint64 update_counter; + char pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE]; } PagestoreShmemState; #if PG_VERSION_NUM >= 150000 @@ -83,7 +83,7 @@ static PagestoreShmemState *pagestore_shared; static uint64 pagestore_local_counter = 0; static char local_pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE]; -bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL; +bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL; static bool pageserver_flush(void); static void pageserver_disconnect(void); @@ -91,43 +91,43 @@ static void pageserver_disconnect(void); static bool PagestoreShmemIsValid() { - return pagestore_shared && UsedShmemSegAddr; + return pagestore_shared && UsedShmemSegAddr; } static bool CheckPageserverConnstring(char **newval, void **extra, GucSource source) { - return strlen(*newval) < MAX_PAGESERVER_CONNSTRING_SIZE; + return strlen(*newval) < MAX_PAGESERVER_CONNSTRING_SIZE; } static void AssignPageserverConnstring(const char *newval, void *extra) { - if(!PagestoreShmemIsValid()) - return; - LWLockAcquire(pagestore_shared->lock, LW_EXCLUSIVE); - strlcpy(pagestore_shared->pageserver_connstring, newval, MAX_PAGESERVER_CONNSTRING_SIZE); - pg_atomic_fetch_add_u64(&pagestore_shared->update_counter, 1); - LWLockRelease(pagestore_shared->lock); + if (!PagestoreShmemIsValid()) + return; + LWLockAcquire(pagestore_shared->lock, LW_EXCLUSIVE); + strlcpy(pagestore_shared->pageserver_connstring, newval, MAX_PAGESERVER_CONNSTRING_SIZE); + pg_atomic_fetch_add_u64(&pagestore_shared->update_counter, 1); + LWLockRelease(pagestore_shared->lock); } static bool CheckConnstringUpdated() { - if(!PagestoreShmemIsValid()) - return false; - return pagestore_local_counter < pg_atomic_read_u64(&pagestore_shared->update_counter); + if (!PagestoreShmemIsValid()) + return false; + return pagestore_local_counter < pg_atomic_read_u64(&pagestore_shared->update_counter); } static void ReloadConnstring() { - if(!PagestoreShmemIsValid()) - return; - LWLockAcquire(pagestore_shared->lock, LW_SHARED); - strlcpy(local_pageserver_connstring, pagestore_shared->pageserver_connstring, sizeof(local_pageserver_connstring)); - pagestore_local_counter = pg_atomic_read_u64(&pagestore_shared->update_counter); - LWLockRelease(pagestore_shared->lock); + if (!PagestoreShmemIsValid()) + return; + LWLockAcquire(pagestore_shared->lock, LW_SHARED); + strlcpy(local_pageserver_connstring, pagestore_shared->pageserver_connstring, sizeof(local_pageserver_connstring)); + pagestore_local_counter = pg_atomic_read_u64(&pagestore_shared->update_counter); + LWLockRelease(pagestore_shared->lock); } static bool @@ -141,21 +141,20 @@ pageserver_connect(int elevel) Assert(!connected); - if(CheckConnstringUpdated()) - { - ReloadConnstring(); - } + if (CheckConnstringUpdated()) + { + ReloadConnstring(); + } /* * Connect using the connection string we got from the * neon.pageserver_connstring GUC. If the NEON_AUTH_TOKEN environment * variable was set, use that as the password. * - * The connection options are parsed in the order they're given, so - * when we set the password before the connection string, the - * connection string can override the password from the env variable. - * Seems useful, although we don't currently use that capability - * anywhere. + * The connection options are parsed in the order they're given, so when + * we set the password before the connection string, the connection string + * can override the password from the env variable. Seems useful, although + * we don't currently use that capability anywhere. */ n = 0; if (neon_auth_token) @@ -198,9 +197,9 @@ pageserver_connect(int elevel) pageserver_conn_wes = CreateWaitEventSet(TopMemoryContext, 3); AddWaitEventToSet(pageserver_conn_wes, WL_LATCH_SET, PGINVALID_SOCKET, - MyLatch, NULL); + MyLatch, NULL); AddWaitEventToSet(pageserver_conn_wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, - NULL, NULL); + NULL, NULL); AddWaitEventToSet(pageserver_conn_wes, WL_SOCKET_READABLE, PQsocket(pageserver_conn), NULL, NULL); while (PQisBusy(pageserver_conn)) @@ -265,6 +264,7 @@ retry: if (!PQconsumeInput(pageserver_conn)) { char *msg = pchomp(PQerrorMessage(pageserver_conn)); + neon_log(LOG, "could not get response from pageserver: %s", msg); pfree(msg); return -1; @@ -305,15 +305,15 @@ pageserver_disconnect(void) } static bool -pageserver_send(NeonRequest * request) +pageserver_send(NeonRequest *request) { StringInfoData req_buff; - if(CheckConnstringUpdated()) - { - pageserver_disconnect(); - ReloadConnstring(); - } + if (CheckConnstringUpdated()) + { + pageserver_disconnect(); + ReloadConnstring(); + } /* If the connection was lost for some reason, reconnect */ if (connected && PQstatus(pageserver_conn) == CONNECTION_BAD) @@ -326,10 +326,12 @@ pageserver_send(NeonRequest * request) /* * If pageserver is stopped, the connections from compute node are broken. - * The compute node doesn't notice that immediately, but it will cause the next request to fail, usually on the next query. - * That causes user-visible errors if pageserver is restarted, or the tenant is moved from one pageserver to another. - * See https://github.com/neondatabase/neon/issues/1138 - * So try to reestablish connection in case of failure. + * The compute node doesn't notice that immediately, but it will cause the + * next request to fail, usually on the next query. That causes + * user-visible errors if pageserver is restarted, or the tenant is moved + * from one pageserver to another. See + * https://github.com/neondatabase/neon/issues/1138 So try to reestablish + * connection in case of failure. */ if (!connected) { @@ -353,6 +355,7 @@ pageserver_send(NeonRequest * request) if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0) { char *msg = pchomp(PQerrorMessage(pageserver_conn)); + pageserver_disconnect(); neon_log(LOG, "pageserver_send disconnect because failed to send page request (try to reconnect): %s", msg); pfree(msg); @@ -410,7 +413,8 @@ pageserver_receive(void) } else if (rc == -2) { - char* msg = pchomp(PQerrorMessage(pageserver_conn)); + char *msg = pchomp(PQerrorMessage(pageserver_conn)); + pageserver_disconnect(); neon_log(ERROR, "pageserver_receive disconnect because could not read COPY data: %s", msg); } @@ -444,6 +448,7 @@ pageserver_flush(void) if (PQflush(pageserver_conn)) { char *msg = pchomp(PQerrorMessage(pageserver_conn)); + pageserver_disconnect(); neon_log(LOG, "pageserver_flush disconnect because failed to flush page requests: %s", msg); pfree(msg); @@ -471,46 +476,47 @@ check_neon_id(char **newval, void **extra, GucSource source) static Size PagestoreShmemSize(void) { - return sizeof(PagestoreShmemState); + return sizeof(PagestoreShmemState); } static bool PagestoreShmemInit(void) { - bool found; - LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); - pagestore_shared = ShmemInitStruct("libpagestore shared state", - PagestoreShmemSize(), - &found); - if(!found) - { - pagestore_shared->lock = &(GetNamedLWLockTranche("neon_libpagestore")->lock); - pg_atomic_init_u64(&pagestore_shared->update_counter, 0); - AssignPageserverConnstring(page_server_connstring, NULL); - } - LWLockRelease(AddinShmemInitLock); - return found; + bool found; + + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); + pagestore_shared = ShmemInitStruct("libpagestore shared state", + PagestoreShmemSize(), + &found); + if (!found) + { + pagestore_shared->lock = &(GetNamedLWLockTranche("neon_libpagestore")->lock); + pg_atomic_init_u64(&pagestore_shared->update_counter, 0); + AssignPageserverConnstring(page_server_connstring, NULL); + } + LWLockRelease(AddinShmemInitLock); + return found; } static void pagestore_shmem_startup_hook(void) { - if(prev_shmem_startup_hook) - prev_shmem_startup_hook(); + if (prev_shmem_startup_hook) + prev_shmem_startup_hook(); - PagestoreShmemInit(); + PagestoreShmemInit(); } static void pagestore_shmem_request(void) { #if PG_VERSION_NUM >= 150000 - if(prev_shmem_request_hook) - prev_shmem_request_hook(); + if (prev_shmem_request_hook) + prev_shmem_request_hook(); #endif - RequestAddinShmemSpace(PagestoreShmemSize()); - RequestNamedLWLockTranche("neon_libpagestore", 1); + RequestAddinShmemSpace(PagestoreShmemSize()); + RequestNamedLWLockTranche("neon_libpagestore", 1); } static void @@ -520,7 +526,7 @@ pagestore_prepare_shmem(void) prev_shmem_request_hook = shmem_request_hook; shmem_request_hook = pagestore_shmem_request; #else - pagestore_shmem_request(); + pagestore_shmem_request(); #endif prev_shmem_startup_hook = shmem_startup_hook; shmem_startup_hook = pagestore_shmem_startup_hook; @@ -532,7 +538,7 @@ pagestore_prepare_shmem(void) void pg_init_libpagestore(void) { - pagestore_prepare_shmem(); + pagestore_prepare_shmem(); DefineCustomStringVariable("neon.pageserver_connstring", "connection string to the page server", @@ -607,7 +613,10 @@ pg_init_libpagestore(void) neon_log(PageStoreTrace, "libpagestore already loaded"); page_server = &api; - /* Retrieve the auth token to use when connecting to pageserver and safekeepers */ + /* + * Retrieve the auth token to use when connecting to pageserver and + * safekeepers + */ neon_auth_token = getenv("NEON_AUTH_TOKEN"); if (neon_auth_token) neon_log(LOG, "using storage auth token from NEON_AUTH_TOKEN environment variable"); diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index 4850b0d6a1..6a8f8cca70 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -48,9 +48,9 @@ _PG_init(void) pg_init_extension_server(); - // Important: This must happen after other parts of the extension - // are loaded, otherwise any settings to GUCs that were set before - // the extension was loaded will be removed. + /* Important: This must happen after other parts of the extension */ + /* are loaded, otherwise any settings to GUCs that were set before */ + /* the extension was loaded will be removed. */ EmitWarningsOnPlaceholders("neon"); } diff --git a/pgxn/neon/neon.h b/pgxn/neon/neon.h index 3300c67456..897a8373a1 100644 --- a/pgxn/neon/neon.h +++ b/pgxn/neon/neon.h @@ -32,7 +32,7 @@ extern void pg_init_extension_server(void); * block_id; false otherwise. */ extern bool neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id); -extern bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id); +extern bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id); extern uint64 BackpressureThrottlingTime(void); extern void replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn); diff --git a/pgxn/neon/neon_pgversioncompat.h b/pgxn/neon/neon_pgversioncompat.h index 8db0d5341e..d38ef48910 100644 --- a/pgxn/neon/neon_pgversioncompat.h +++ b/pgxn/neon/neon_pgversioncompat.h @@ -59,7 +59,7 @@ #define DropRelationAllLocalBuffers DropRelFileNodeAllLocalBuffers -#else /* major version >= 16 */ +#else /* major version >= 16 */ #define USE_RELFILELOCATOR @@ -109,4 +109,4 @@ #define DropRelationAllLocalBuffers DropRelationAllLocalBuffers #endif -#endif //NEON_PGVERSIONCOMPAT_H +#endif /* //NEON_PGVERSIONCOMPAT_H */ diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index d61f74b5c8..ecfadb01d6 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -40,13 +40,13 @@ typedef enum T_NeonGetPageResponse, T_NeonErrorResponse, T_NeonDbSizeResponse, -} NeonMessageTag; +} NeonMessageTag; /* base struct for c-style inheritance */ typedef struct { NeonMessageTag tag; -} NeonMessage; +} NeonMessage; #define messageTag(m) (((const NeonMessage *)(m))->tag) @@ -67,27 +67,27 @@ typedef struct NeonMessageTag tag; bool latest; /* if true, request latest page version */ XLogRecPtr lsn; /* request page version @ this LSN */ -} NeonRequest; +} NeonRequest; typedef struct { NeonRequest req; NRelFileInfo rinfo; ForkNumber forknum; -} NeonExistsRequest; +} NeonExistsRequest; typedef struct { NeonRequest req; NRelFileInfo rinfo; ForkNumber forknum; -} NeonNblocksRequest; +} NeonNblocksRequest; typedef struct { NeonRequest req; Oid dbNode; -} NeonDbSizeRequest; +} NeonDbSizeRequest; typedef struct { @@ -95,31 +95,31 @@ typedef struct NRelFileInfo rinfo; ForkNumber forknum; BlockNumber blkno; -} NeonGetPageRequest; +} NeonGetPageRequest; /* supertype of all the Neon*Response structs below */ typedef struct { NeonMessageTag tag; -} NeonResponse; +} NeonResponse; typedef struct { NeonMessageTag tag; bool exists; -} NeonExistsResponse; +} NeonExistsResponse; typedef struct { NeonMessageTag tag; uint32 n_blocks; -} NeonNblocksResponse; +} NeonNblocksResponse; typedef struct { NeonMessageTag tag; char page[FLEXIBLE_ARRAY_MEMBER]; -} NeonGetPageResponse; +} NeonGetPageResponse; #define PS_GETPAGERESPONSE_SIZE (MAXALIGN(offsetof(NeonGetPageResponse, page) + BLCKSZ)) @@ -127,18 +127,18 @@ typedef struct { NeonMessageTag tag; int64 db_size; -} NeonDbSizeResponse; +} NeonDbSizeResponse; typedef struct { NeonMessageTag tag; char message[FLEXIBLE_ARRAY_MEMBER]; /* null-terminated error * message */ -} NeonErrorResponse; +} NeonErrorResponse; -extern StringInfoData nm_pack_request(NeonRequest * msg); -extern NeonResponse * nm_unpack_response(StringInfo s); -extern char *nm_to_string(NeonMessage * msg); +extern StringInfoData nm_pack_request(NeonRequest *msg); +extern NeonResponse *nm_unpack_response(StringInfo s); +extern char *nm_to_string(NeonMessage *msg); /* * API @@ -146,20 +146,20 @@ extern char *nm_to_string(NeonMessage * msg); typedef struct { - bool (*send) (NeonRequest * request); + bool (*send) (NeonRequest *request); NeonResponse *(*receive) (void); bool (*flush) (void); -} page_server_api; +} page_server_api; extern void prefetch_on_ps_disconnect(void); -extern page_server_api * page_server; +extern page_server_api *page_server; extern char *page_server_connstring; -extern int flush_every_n_requests; -extern int readahead_buffer_size; +extern int flush_every_n_requests; +extern int readahead_buffer_size; extern bool seqscan_prefetch_enabled; -extern int seqscan_prefetch_distance; +extern int seqscan_prefetch_distance; extern char *neon_timeline; extern char *neon_tenant; extern bool wal_redo; @@ -194,14 +194,14 @@ extern bool neon_prefetch(SMgrRelation reln, ForkNumber forknum, extern void neon_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer); extern PGDLLEXPORT void neon_read_at_lsn(NRelFileInfo rnode, ForkNumber forkNum, BlockNumber blkno, - XLogRecPtr request_lsn, bool request_latest, char *buffer); + XLogRecPtr request_lsn, bool request_latest, char *buffer); extern void neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync); #else extern void neon_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, void *buffer); extern PGDLLEXPORT void neon_read_at_lsn(NRelFileInfo rnode, ForkNumber forkNum, BlockNumber blkno, - XLogRecPtr request_lsn, bool request_latest, void *buffer); + XLogRecPtr request_lsn, bool request_latest, void *buffer); extern void neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const void *buffer, bool skipFsync); #endif diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 74ffbdb371..c9c41cd1cc 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -101,21 +101,21 @@ typedef enum UNLOGGED_BUILD_PHASE_1, UNLOGGED_BUILD_PHASE_2, UNLOGGED_BUILD_NOT_PERMANENT -} UnloggedBuildPhase; +} UnloggedBuildPhase; static SMgrRelation unlogged_build_rel = NULL; static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS; /* * Prefetch implementation: - * + * * Prefetch is performed locally by each backend. * * There can be up to readahead_buffer_size active IO requests registered at * any time. Requests using smgr_prefetch are sent to the pageserver, but we * don't wait on the response. Requests using smgr_read are either read from * the buffer, or (if that's not possible) we wait on the response to arrive - - * this also will allow us to receive other prefetched pages. + * this also will allow us to receive other prefetched pages. * Each request is immediately written to the output buffer of the pageserver * connection, but may not be flushed if smgr_prefetch is used: pageserver * flushes sent requests on manual flush, or every neon.flush_output_after @@ -139,7 +139,7 @@ static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS; /* * State machine: - * + * * not in hash : in hash * : * UNUSED ------> REQUESTED --> RECEIVED @@ -150,30 +150,34 @@ static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS; * +----------------+------------+ * : */ -typedef enum PrefetchStatus { - PRFS_UNUSED = 0, /* unused slot */ - PRFS_REQUESTED, /* request was written to the sendbuffer to PS, but not - * necessarily flushed. - * all fields except response valid */ - PRFS_RECEIVED, /* all fields valid */ - PRFS_TAG_REMAINS, /* only buftag and my_ring_index are still valid */ +typedef enum PrefetchStatus +{ + PRFS_UNUSED = 0, /* unused slot */ + PRFS_REQUESTED, /* request was written to the sendbuffer to + * PS, but not necessarily flushed. all fields + * except response valid */ + PRFS_RECEIVED, /* all fields valid */ + PRFS_TAG_REMAINS, /* only buftag and my_ring_index are still + * valid */ } PrefetchStatus; -typedef struct PrefetchRequest { - BufferTag buftag; /* must be first entry in the struct */ +typedef struct PrefetchRequest +{ + BufferTag buftag; /* must be first entry in the struct */ XLogRecPtr effective_request_lsn; XLogRecPtr actual_request_lsn; - NeonResponse *response; /* may be null */ + NeonResponse *response; /* may be null */ PrefetchStatus status; uint64 my_ring_index; } PrefetchRequest; /* prefetch buffer lookup hash table */ -typedef struct PrfHashEntry { +typedef struct PrfHashEntry +{ PrefetchRequest *slot; - uint32 status; - uint32 hash; + uint32 status; + uint32 hash; } PrfHashEntry; #define SH_PREFIX prfh @@ -197,36 +201,42 @@ typedef struct PrfHashEntry { /* * PrefetchState maintains the state of (prefetch) getPage@LSN requests. * It maintains a (ring) buffer of in-flight requests and responses. - * + * * We maintain several indexes into the ring buffer: * ring_unused >= ring_flush >= ring_receive >= ring_last >= 0 - * + * * ring_unused points to the first unused slot of the buffer * ring_receive is the next request that is to be received * ring_last is the oldest received entry in the buffer - * + * * Apart from being an entry in the ring buffer of prefetch requests, each * PrefetchRequest that is not UNUSED is indexed in prf_hash by buftag. */ -typedef struct PrefetchState { - MemoryContext bufctx; /* context for prf_buffer[].response allocations */ - MemoryContext errctx; /* context for prf_buffer[].response allocations */ - MemoryContext hashctx; /* context for prf_buffer */ +typedef struct PrefetchState +{ + MemoryContext bufctx; /* context for prf_buffer[].response + * allocations */ + MemoryContext errctx; /* context for prf_buffer[].response + * allocations */ + MemoryContext hashctx; /* context for prf_buffer */ /* buffer indexes */ - uint64 ring_unused; /* first unused slot */ - uint64 ring_flush; /* next request to flush */ - uint64 ring_receive; /* next slot that is to receive a response */ - uint64 ring_last; /* min slot with a response value */ + uint64 ring_unused; /* first unused slot */ + uint64 ring_flush; /* next request to flush */ + uint64 ring_receive; /* next slot that is to receive a response */ + uint64 ring_last; /* min slot with a response value */ /* metrics / statistics */ - int n_responses_buffered; /* count of PS responses not yet in buffers */ - int n_requests_inflight; /* count of PS requests considered in flight */ - int n_unused; /* count of buffers < unused, > last, that are also unused */ + int n_responses_buffered; /* count of PS responses not yet in + * buffers */ + int n_requests_inflight; /* count of PS requests considered in + * flight */ + int n_unused; /* count of buffers < unused, > last, that are + * also unused */ /* the buffers */ - prfh_hash *prf_hash; - PrefetchRequest prf_buffer[]; /* prefetch buffers */ + prfh_hash *prf_hash; + PrefetchRequest prf_buffer[]; /* prefetch buffers */ } PrefetchState; PrefetchState *MyPState; @@ -264,10 +274,10 @@ static XLogRecPtr neon_get_request_lsn(bool *latest, NRelFileInfo rinfo, static bool compact_prefetch_buffers(void) { - uint64 empty_ring_index = MyPState->ring_last; - uint64 search_ring_index = MyPState->ring_receive; - int n_moved = 0; - + uint64 empty_ring_index = MyPState->ring_last; + uint64 search_ring_index = MyPState->ring_receive; + int n_moved = 0; + if (MyPState->ring_receive == MyPState->ring_last) return false; @@ -282,15 +292,14 @@ compact_prefetch_buffers(void) } /* - * Here we have established: - * slots < search_ring_index have an unknown state (not scanned) - * slots >= search_ring_index and <= empty_ring_index are unused - * slots > empty_ring_index are in use, or outside our buffer's range. - * ... unless search_ring_index <= ring_last - * + * Here we have established: slots < search_ring_index have an unknown + * state (not scanned) slots >= search_ring_index and <= empty_ring_index + * are unused slots > empty_ring_index are in use, or outside our buffer's + * range. ... unless search_ring_index <= ring_last + * * Therefore, there is a gap of at least one unused items between - * search_ring_index and empty_ring_index (both inclusive), which grows as we hit - * more unused items while moving backwards through the array. + * search_ring_index and empty_ring_index (both inclusive), which grows as + * we hit more unused items while moving backwards through the array. */ while (search_ring_index > MyPState->ring_last) @@ -330,7 +339,10 @@ compact_prefetch_buffers(void) /* empty the moved slot */ source_slot->status = PRFS_UNUSED; - source_slot->buftag = (BufferTag) {0}; + source_slot->buftag = (BufferTag) + { + 0 + }; source_slot->response = NULL; source_slot->my_ring_index = 0; source_slot->effective_request_lsn = 0; @@ -340,8 +352,8 @@ compact_prefetch_buffers(void) } /* - * Only when we've moved slots we can expect trailing unused slots, - * so only then we clean up trailing unused slots. + * Only when we've moved slots we can expect trailing unused slots, so + * only then we clean up trailing unused slots. */ if (n_moved > 0) { @@ -358,10 +370,10 @@ readahead_buffer_resize(int newsize, void *extra) uint64 end, nfree = newsize; PrefetchState *newPState; - Size newprfs_size = offsetof(PrefetchState, prf_buffer) + ( - sizeof(PrefetchRequest) * newsize - ); - + Size newprfs_size = offsetof(PrefetchState, prf_buffer) + ( + sizeof(PrefetchRequest) * newsize + ); + /* don't try to re-initialize if we haven't initialized yet */ if (MyPState == NULL) return; @@ -388,12 +400,12 @@ readahead_buffer_resize(int newsize, void *extra) newPState->ring_receive = newsize; newPState->ring_flush = newsize; - /* + /* * Copy over the prefetches. - * + * * We populate the prefetch array from the end; to retain the most recent - * prefetches, but this has the benefit of only needing to do one iteration - * on the dataset, and trivial compaction. + * prefetches, but this has the benefit of only needing to do one + * iteration on the dataset, and trivial compaction. */ for (end = MyPState->ring_unused - 1; end >= MyPState->ring_last && end != UINT64_MAX && nfree != 0; @@ -401,7 +413,7 @@ readahead_buffer_resize(int newsize, void *extra) { PrefetchRequest *slot = GetPrfSlot(end); PrefetchRequest *newslot; - bool found; + bool found; if (slot->status == PRFS_UNUSED) continue; @@ -464,10 +476,11 @@ consume_prefetch_responses(void) static void prefetch_cleanup_trailing_unused(void) { - uint64 ring_index; + uint64 ring_index; PrefetchRequest *slot; - while (MyPState->ring_last < MyPState->ring_receive) { + while (MyPState->ring_last < MyPState->ring_receive) + { ring_index = MyPState->ring_last; slot = GetPrfSlot(ring_index); @@ -481,7 +494,7 @@ prefetch_cleanup_trailing_unused(void) /* * Wait for slot of ring_index to have received its response. * The caller is responsible for making sure the request buffer is flushed. - * + * * NOTE: this function may indirectly update MyPState->pfs_hash; which * invalidates any active pointers into the hash table. */ @@ -513,7 +526,7 @@ prefetch_wait_for(uint64 ring_index) /* * Read the response of a prefetch request into its slot. - * + * * The caller is responsible for making sure that the request for this buffer * was flushed to the PageServer. * @@ -553,7 +566,7 @@ prefetch_read(PrefetchRequest *slot) /* * Disconnect hook - drop prefetches when the connection drops - * + * * If we don't remove the failed prefetches, we'd be serving incorrect * data to the smgr. */ @@ -564,7 +577,7 @@ prefetch_on_ps_disconnect(void) while (MyPState->ring_receive < MyPState->ring_unused) { PrefetchRequest *slot; - uint64 ring_index = MyPState->ring_receive; + uint64 ring_index = MyPState->ring_receive; slot = GetPrfSlot(ring_index); @@ -594,7 +607,7 @@ prefetch_set_unused(uint64 ring_index) PrefetchRequest *slot = GetPrfSlot(ring_index); if (ring_index < MyPState->ring_last) - return; /* Should already be unused */ + return; /* Should already be unused */ Assert(MyPState->ring_unused > ring_index); @@ -625,7 +638,11 @@ prefetch_set_unused(uint64 ring_index) /* run cleanup if we're holding back ring_last */ if (MyPState->ring_last == ring_index) prefetch_cleanup_trailing_unused(); - /* ... and try to store the buffered responses more compactly if > 12.5% of the buffer is gaps */ + + /* + * ... and try to store the buffered responses more compactly if > 12.5% + * of the buffer is gaps + */ else if (ReceiveBufferNeedsCompaction()) compact_prefetch_buffers(); } @@ -633,7 +650,7 @@ prefetch_set_unused(uint64 ring_index) static void prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force_lsn) { - bool found; + bool found; NeonGetPageRequest request = { .req.tag = T_NeonGetPageRequest, .req.latest = false, @@ -651,21 +668,22 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force } else { - XLogRecPtr lsn = neon_get_request_lsn( - &request.req.latest, - BufTagGetNRelFileInfo(slot->buftag), - slot->buftag.forkNum, - slot->buftag.blockNum - ); + XLogRecPtr lsn = neon_get_request_lsn( + &request.req.latest, + BufTagGetNRelFileInfo(slot->buftag), + slot->buftag.forkNum, + slot->buftag.blockNum + ); + /* - * Note: effective_request_lsn is potentially higher than the requested - * LSN, but still correct: - * + * Note: effective_request_lsn is potentially higher than the + * requested LSN, but still correct: + * * We know there are no changes between the actual requested LSN and * the value of effective_request_lsn: If there were, the page would - * have been in cache and evicted between those LSN values, which - * then would have had to result in a larger request LSN for this page. - * + * have been in cache and evicted between those LSN values, which then + * would have had to result in a larger request LSN for this page. + * * It is possible that a concurrent backend loads the page, modifies * it and then evicts it again, but the LSN of that eviction cannot be * smaller than the current WAL insert/redo pointer, which is already @@ -702,7 +720,7 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force * prefetch_register_buffer() - register and prefetch buffer * * Register that we may want the contents of BufferTag in the near future. - * + * * If force_latest and force_lsn are not NULL, those values are sent to the * pageserver. If they are NULL, we utilize the lastWrittenLsn -infrastructure * to fill in these values manually. @@ -714,14 +732,14 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force static uint64 prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_lsn) { - uint64 ring_index; + uint64 ring_index; PrefetchRequest req; PrefetchRequest *slot; PrfHashEntry *entry; /* use an intermediate PrefetchRequest struct to ensure correct alignment */ req.buftag = tag; - Retry: +Retry: entry = prfh_lookup(MyPState->prf_hash, (PrefetchRequest *) &req); if (entry != NULL) @@ -741,7 +759,10 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls */ if (force_latest && force_lsn) { - /* if we want the latest version, any effective_request_lsn < request lsn is OK */ + /* + * if we want the latest version, any effective_request_lsn < + * request lsn is OK + */ if (*force_latest) { if (*force_lsn > slot->effective_request_lsn) @@ -752,7 +773,11 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls } } - /* if we don't want the latest version, only accept requests with the exact same LSN */ + + /* + * if we don't want the latest version, only accept requests with + * the exact same LSN + */ else { if (*force_lsn != slot->effective_request_lsn) @@ -799,7 +824,8 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls */ if (MyPState->ring_last + readahead_buffer_size - 1 == MyPState->ring_unused) { - uint64 cleanup_index = MyPState->ring_last; + uint64 cleanup_index = MyPState->ring_last; + slot = GetPrfSlot(cleanup_index); Assert(slot->status != PRFS_UNUSED); @@ -814,7 +840,10 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls } else { - /* We have the slot for ring_last, so that must still be in progress */ + /* + * We have the slot for ring_last, so that must still be in + * progress + */ switch (slot->status) { case PRFS_REQUESTED: @@ -833,8 +862,8 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls } /* - * The next buffer pointed to by `ring_unused` is now definitely empty, - * so we can insert the new request to it. + * The next buffer pointed to by `ring_unused` is now definitely empty, so + * we can insert the new request to it. */ ring_index = MyPState->ring_unused; slot = &MyPState->prf_buffer[((ring_index) % readahead_buffer_size)]; @@ -860,7 +889,10 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls { if (!page_server->flush()) { - /* Prefetch set is reset in case of error, so we should try to register our request once again */ + /* + * Prefetch set is reset in case of error, so we should try to + * register our request once again + */ goto Retry; } MyPState->ring_flush = MyPState->ring_unused; @@ -872,8 +904,10 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls static NeonResponse * page_server_request(void const *req) { - NeonResponse* resp; - do { + NeonResponse *resp; + + do + { while (!page_server->send((NeonRequest *) req) || !page_server->flush()); MyPState->ring_flush = MyPState->ring_unused; consume_prefetch_responses(); @@ -885,7 +919,7 @@ page_server_request(void const *req) StringInfoData -nm_pack_request(NeonRequest * msg) +nm_pack_request(NeonRequest *msg) { StringInfoData s; @@ -1001,7 +1035,7 @@ nm_unpack_response(StringInfo s) /* XXX: should be varlena */ memcpy(msg_resp->page, pq_getmsgbytes(s, BLCKSZ), BLCKSZ); pq_getmsgend(s); - + Assert(msg_resp->tag == T_NeonGetPageResponse); resp = (NeonResponse *) msg_resp; @@ -1057,7 +1091,7 @@ nm_unpack_response(StringInfo s) /* dump to json for debugging / error reporting purposes */ char * -nm_to_string(NeonMessage * msg) +nm_to_string(NeonMessage *msg) { StringInfoData s; @@ -1186,7 +1220,7 @@ nm_to_string(NeonMessage * msg) * directly because it skips the logging if the LSN is new enough. */ static XLogRecPtr -log_newpage_copy(NRelFileInfo *rinfo, ForkNumber forkNum, BlockNumber blkno, +log_newpage_copy(NRelFileInfo * rinfo, ForkNumber forkNum, BlockNumber blkno, Page page, bool page_std) { PGAlignedBlock copied_buffer; @@ -1209,11 +1243,11 @@ PageIsEmptyHeapPage(char *buffer) } static void -neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, + neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, #if PG_MAJORVERSION_NUM < 16 - char *buffer, bool force) + char *buffer, bool force) #else - const char *buffer, bool force) + const char *buffer, bool force) #endif { XLogRecPtr lsn = PageGetLSN((Page) buffer); @@ -1313,24 +1347,24 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, void neon_init(void) { - Size prfs_size; + Size prfs_size; if (MyPState != NULL) return; prfs_size = offsetof(PrefetchState, prf_buffer) + ( - sizeof(PrefetchRequest) * readahead_buffer_size - ); + sizeof(PrefetchRequest) * readahead_buffer_size + ); MyPState = MemoryContextAllocZero(TopMemoryContext, prfs_size); - + MyPState->n_unused = readahead_buffer_size; MyPState->bufctx = SlabContextCreate(TopMemoryContext, "NeonSMGR/prefetch", SLAB_DEFAULT_BLOCK_SIZE * 17, PS_GETPAGERESPONSE_SIZE); - MyPState->errctx = AllocSetContextCreate(TopMemoryContext, + MyPState->errctx = AllocSetContextCreate(TopMemoryContext, "NeonSMGR/errors", ALLOCSET_DEFAULT_SIZES); MyPState->hashctx = AllocSetContextCreate(TopMemoryContext, @@ -1570,14 +1604,14 @@ neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo) /* * Newly created relation is empty, remember that in the relsize cache. * - * Note that in REDO, this is called to make sure the relation fork exists, - * but it does not truncate the relation. So, we can only update the - * relsize if it didn't exist before. - * + * Note that in REDO, this is called to make sure the relation fork + * exists, but it does not truncate the relation. So, we can only update + * the relsize if it didn't exist before. + * * Also, in redo, we must make sure to update the cached size of the - * relation, as that is the primary source of truth for REDO's - * file length considerations, and as file extension isn't (perfectly) - * logged, we need to take care of that before we hit file size checks. + * relation, as that is the primary source of truth for REDO's file length + * considerations, and as file extension isn't (perfectly) logged, we need + * to take care of that before we hit file size checks. * * FIXME: This is currently not just an optimization, but required for * correctness. Postgres can call smgrnblocks() on the newly-created @@ -1653,7 +1687,7 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, #endif { XLogRecPtr lsn; - BlockNumber n_blocks = 0; + BlockNumber n_blocks = 0; switch (reln->smgr_relpersistence) { @@ -1694,9 +1728,10 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, } /* - * Usually Postgres doesn't extend relation on more than one page - * (leaving holes). But this rule is violated in PG-15 where CreateAndCopyRelationData - * call smgrextend for destination relation n using size of source relation + * Usually Postgres doesn't extend relation on more than one page (leaving + * holes). But this rule is violated in PG-15 where + * CreateAndCopyRelationData call smgrextend for destination relation n + * using size of source relation */ n_blocks = neon_nblocks(reln, forkNum); while (n_blocks < blkno) @@ -1717,11 +1752,13 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, if (IS_LOCAL_REL(reln)) mdextend(reln, forkNum, blkno, buffer, skipFsync); #endif + /* - * smgr_extend is often called with an all-zeroes page, so lsn==InvalidXLogRecPtr. - * An smgr_write() call will come for the buffer later, after it has been initialized - * with the real page contents, and it is eventually evicted from the buffer cache. - * But we need a valid LSN to the relation metadata update now. + * smgr_extend is often called with an all-zeroes page, so + * lsn==InvalidXLogRecPtr. An smgr_write() call will come for the buffer + * later, after it has been initialized with the real page contents, and + * it is eventually evicted from the buffer cache. But we need a valid LSN + * to the relation metadata update now. */ if (lsn == InvalidXLogRecPtr) { @@ -1780,9 +1817,9 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blocknum, if ((uint64) blocknum + nblocks >= (uint64) InvalidBlockNumber) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), - errmsg("cannot extend file \"%s\" beyond %u blocks", - relpath(reln->smgr_rlocator, forkNum), - InvalidBlockNumber))); + errmsg("cannot extend file \"%s\" beyond %u blocks", + relpath(reln->smgr_rlocator, forkNum), + InvalidBlockNumber))); /* Don't log any pages if we're not allowed to do so. */ if (!XLogInsertAllowed()) @@ -1869,7 +1906,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum) switch (reln->smgr_relpersistence) { - case 0: /* probably shouldn't happen, but ignore it */ + case 0: /* probably shouldn't happen, but ignore it */ case RELPERSISTENCE_PERMANENT: break; @@ -1884,9 +1921,10 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum) if (lfc_cache_contains(InfoFromSMgrRel(reln), forknum, blocknum)) return false; - tag = (BufferTag) { + tag = (BufferTag) + { .forkNum = forknum, - .blockNum = blocknum + .blockNum = blocknum }; CopyNRelFileInfoToBufTag(tag, InfoFromSMgrRel(reln)); @@ -1941,11 +1979,11 @@ neon_writeback(SMgrRelation reln, ForkNumber forknum, * To avoid breaking tests in the runtime please keep function signature in sync. */ #if PG_MAJORVERSION_NUM < 16 -void PGDLLEXPORT +void PGDLLEXPORT neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, XLogRecPtr request_lsn, bool request_latest, char *buffer) #else -void PGDLLEXPORT +void PGDLLEXPORT neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, XLogRecPtr request_lsn, bool request_latest, void *buffer) #endif @@ -1956,21 +1994,21 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, PrfHashEntry *entry; PrefetchRequest *slot; - buftag = (BufferTag) { + buftag = (BufferTag) + { .forkNum = forkNum, - .blockNum = blkno, + .blockNum = blkno, }; CopyNRelFileInfoToBufTag(buftag, rinfo); /* * The redo process does not lock pages that it needs to replay but are - * not in the shared buffers, so a concurrent process may request the - * page after redo has decided it won't redo that page and updated the - * LwLSN for that page. - * If we're in hot standby we need to take care that we don't return - * until after REDO has finished replaying up to that LwLSN, as the page - * should have been locked up to that point. + * not in the shared buffers, so a concurrent process may request the page + * after redo has decided it won't redo that page and updated the LwLSN + * for that page. If we're in hot standby we need to take care that we + * don't return until after REDO has finished replaying up to that LwLSN, + * as the page should have been locked up to that point. * * See also the description on neon_redo_read_buffer_filter below. * @@ -1978,7 +2016,7 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, * concurrent failed read IOs. Those IOs should never have a request_lsn * that is as large as the WAL record we're currently replaying, if it * weren't for the behaviour of the LwLsn cache that uses the highest - * value of the LwLsn cache when the entry is not found. + * value of the LwLsn cache when the entry is not found. */ if (RecoveryInProgress() && !(MyBackendType == B_STARTUP)) XLogWaitForReplayOf(request_lsn); @@ -1996,12 +2034,14 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, ring_index = slot->my_ring_index; pgBufferUsage.prefetch.hits += 1; } - else /* the current prefetch LSN is not large enough, so drop the prefetch */ + else /* the current prefetch LSN is not large + * enough, so drop the prefetch */ { /* * We can't drop cache for not-yet-received requested items. It is - * unlikely this happens, but it can happen if prefetch distance is - * large enough and a backend didn't consume all prefetch requests. + * unlikely this happens, but it can happen if prefetch distance + * is large enough and a backend didn't consume all prefetch + * requests. */ if (slot->status == PRFS_REQUESTED) { @@ -2028,11 +2068,11 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, else { /* - * Empty our reference to the prefetch buffer's hash entry. - * When we wait for prefetches, the entry reference is invalidated by - * potential updates to the hash, and when we reconnect to the - * pageserver the prefetch we're waiting for may be dropped, - * in which case we need to retry and take the branch above. + * Empty our reference to the prefetch buffer's hash entry. When + * we wait for prefetches, the entry reference is invalidated by + * potential updates to the hash, and when we reconnect to the + * pageserver the prefetch we're waiting for may be dropped, in + * which case we need to retry and take the branch above. */ entry = NULL; } @@ -2080,11 +2120,11 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, * neon_read() -- Read the specified block from a relation. */ void -neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, + neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, #if PG_MAJORVERSION_NUM < 16 - char *buffer) + char *buffer) #else - void *buffer) + void *buffer) #endif { bool latest; @@ -2219,11 +2259,11 @@ hexdump_page(char *page) * use mdextend(). */ void -neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, + neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, #if PG_MAJORVERSION_NUM < 16 - char *buffer, bool skipFsync) + char *buffer, bool skipFsync) #else - const void *buffer, bool skipFsync) + const void *buffer, bool skipFsync) #endif { XLogRecPtr lsn; @@ -2727,6 +2767,7 @@ static void neon_extend_rel_size(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, XLogRecPtr end_recptr) { BlockNumber relsize; + /* Extend the relation if we know its size */ if (get_cached_relsize(rinfo, forknum, &relsize)) { @@ -2739,11 +2780,11 @@ neon_extend_rel_size(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, else { /* - * Size was not cached. We populate the cache now, with the size of the - * relation measured after this WAL record is applied. + * Size was not cached. We populate the cache now, with the size of + * the relation measured after this WAL record is applied. * - * This length is later reused when we open the smgr to read the block, - * which is fine and expected. + * This length is later reused when we open the smgr to read the + * block, which is fine and expected. */ NeonResponse *response; @@ -2763,7 +2804,7 @@ neon_extend_rel_size(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, Assert(response->tag == T_NeonNblocksResponse); nbresponse = (NeonNblocksResponse *) response; - relsize = Max(nbresponse->n_blocks, blkno+1); + relsize = Max(nbresponse->n_blocks, blkno + 1); set_cached_relsize(rinfo, forknum, relsize); SetLastWrittenLSNForRelation(end_recptr, rinfo, forknum); @@ -2805,7 +2846,7 @@ get_fsm_physical_block(BlockNumber heapblk) /* * Return whether we can skip the redo for this block. - * + * * The conditions for skipping the IO are: * * - The block is not in the shared buffers, and @@ -2844,7 +2885,7 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id) XLogRecPtr end_recptr = record->EndRecPtr; NRelFileInfo rinfo; ForkNumber forknum; - BlockNumber blkno; + BlockNumber blkno; BufferTag tag; uint32 hash; LWLock *partitionLock; @@ -2863,8 +2904,8 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id) /* * Out of an abundance of caution, we always run redo on shared catalogs, - * regardless of whether the block is stored in shared buffers. - * See also this function's top comment. + * regardless of whether the block is stored in shared buffers. See also + * this function's top comment. */ if (!OidIsValid(NInfoGetDbOid(rinfo))) return false; @@ -2890,8 +2931,9 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id) /* In both cases st lwlsn past this WAL record */ SetLastWrittenLSNForBlock(end_recptr, rinfo, forknum, blkno); - /* we don't have the buffer in memory, update lwLsn past this record, - * also evict page fro file cache + /* + * we don't have the buffer in memory, update lwLsn past this record, also + * evict page fro file cache */ if (no_redo_needed) lfc_evict(rinfo, forknum, blkno); diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 10544ba7a8..7d9dbfdb7f 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -178,7 +178,7 @@ WalProposerFree(WalProposer *wp) if (wp->propTermHistory.entries != NULL) pfree(wp->propTermHistory.entries); wp->propTermHistory.entries = NULL; - + pfree(wp); } @@ -275,7 +275,7 @@ WalProposerPoll(WalProposer *wp) wp->config->safekeeper_connection_timeout)) { walprop_log(WARNING, "terminating connection to safekeeper '%s:%s' in '%s' state: no messages received during the last %dms or connection attempt took longer than that", - sk->host, sk->port, FormatSafekeeperState(sk->state), wp->config->safekeeper_connection_timeout); + sk->host, sk->port, FormatSafekeeperState(sk->state), wp->config->safekeeper_connection_timeout); ShutdownConnection(sk); } } @@ -395,7 +395,7 @@ ResetConnection(Safekeeper *sk) * https://www.postgresql.org/docs/devel/libpq-connect.html#LIBPQ-PQCONNECTSTARTPARAMS */ walprop_log(WARNING, "Immediate failure to connect with node '%s:%s':\n\terror: %s", - sk->host, sk->port, wp->api.conn_error_message(sk)); + sk->host, sk->port, wp->api.conn_error_message(sk)); /* * Even though the connection failed, we still need to clean up the @@ -489,7 +489,7 @@ AdvancePollState(Safekeeper *sk, uint32 events) */ case SS_OFFLINE: walprop_log(FATAL, "Unexpected safekeeper %s:%s state advancement: is offline", - sk->host, sk->port); + sk->host, sk->port); break; /* actually unreachable, but prevents * -Wimplicit-fallthrough */ @@ -525,7 +525,7 @@ AdvancePollState(Safekeeper *sk, uint32 events) */ case SS_VOTING: walprop_log(WARNING, "EOF from node %s:%s in %s state", sk->host, - sk->port, FormatSafekeeperState(sk->state)); + sk->port, FormatSafekeeperState(sk->state)); ResetConnection(sk); return; @@ -554,7 +554,7 @@ AdvancePollState(Safekeeper *sk, uint32 events) */ case SS_IDLE: walprop_log(WARNING, "EOF from node %s:%s in %s state", sk->host, - sk->port, FormatSafekeeperState(sk->state)); + sk->port, FormatSafekeeperState(sk->state)); ResetConnection(sk); return; @@ -580,7 +580,7 @@ HandleConnectionEvent(Safekeeper *sk) { case WP_CONN_POLLING_OK: walprop_log(LOG, "connected with node %s:%s", sk->host, - sk->port); + sk->port); sk->latestMsgReceivedAt = wp->api.get_current_timestamp(wp); /* @@ -604,7 +604,7 @@ HandleConnectionEvent(Safekeeper *sk) case WP_CONN_POLLING_FAILED: walprop_log(WARNING, "failed to connect to node '%s:%s': %s", - sk->host, sk->port, wp->api.conn_error_message(sk)); + sk->host, sk->port, wp->api.conn_error_message(sk)); /* * If connecting failed, we don't want to restart the connection @@ -641,7 +641,7 @@ SendStartWALPush(Safekeeper *sk) if (!wp->api.conn_send_query(sk, "START_WAL_PUSH")) { walprop_log(WARNING, "Failed to send 'START_WAL_PUSH' query to safekeeper %s:%s: %s", - sk->host, sk->port, wp->api.conn_error_message(sk)); + sk->host, sk->port, wp->api.conn_error_message(sk)); ShutdownConnection(sk); return; } @@ -678,7 +678,7 @@ RecvStartWALPushResult(Safekeeper *sk) case WP_EXEC_FAILED: walprop_log(WARNING, "Failed to send query to safekeeper %s:%s: %s", - sk->host, sk->port, wp->api.conn_error_message(sk)); + sk->host, sk->port, wp->api.conn_error_message(sk)); ShutdownConnection(sk); return; @@ -689,7 +689,7 @@ RecvStartWALPushResult(Safekeeper *sk) */ case WP_EXEC_UNEXPECTED_SUCCESS: walprop_log(WARNING, "Received bad response from safekeeper %s:%s query execution", - sk->host, sk->port); + sk->host, sk->port); ShutdownConnection(sk); return; } @@ -758,8 +758,8 @@ RecvAcceptorGreeting(Safekeeper *sk) { /* Another compute with higher term is running. */ walprop_log(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejects our connection request with term " INT64_FORMAT "", - sk->host, sk->port, - sk->greetResponse.term, wp->propTerm); + sk->host, sk->port, + sk->greetResponse.term, wp->propTerm); } /* @@ -817,11 +817,11 @@ RecvVoteResponse(Safekeeper *sk) return; walprop_log(LOG, - "got VoteResponse from acceptor %s:%s, voteGiven=" UINT64_FORMAT ", epoch=" UINT64_FORMAT ", flushLsn=%X/%X, truncateLsn=%X/%X, timelineStartLsn=%X/%X", - sk->host, sk->port, sk->voteResponse.voteGiven, GetHighestTerm(&sk->voteResponse.termHistory), - LSN_FORMAT_ARGS(sk->voteResponse.flushLsn), - LSN_FORMAT_ARGS(sk->voteResponse.truncateLsn), - LSN_FORMAT_ARGS(sk->voteResponse.timelineStartLsn)); + "got VoteResponse from acceptor %s:%s, voteGiven=" UINT64_FORMAT ", epoch=" UINT64_FORMAT ", flushLsn=%X/%X, truncateLsn=%X/%X, timelineStartLsn=%X/%X", + sk->host, sk->port, sk->voteResponse.voteGiven, GetHighestTerm(&sk->voteResponse.termHistory), + LSN_FORMAT_ARGS(sk->voteResponse.flushLsn), + LSN_FORMAT_ARGS(sk->voteResponse.truncateLsn), + LSN_FORMAT_ARGS(sk->voteResponse.timelineStartLsn)); /* * In case of acceptor rejecting our vote, bail out, but only if either it @@ -832,8 +832,8 @@ RecvVoteResponse(Safekeeper *sk) (sk->voteResponse.term > wp->propTerm || wp->n_votes < wp->quorum)) { walprop_log(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejects our connection request with term " INT64_FORMAT "", - sk->host, sk->port, - sk->voteResponse.term, wp->propTerm); + sk->host, sk->port, + sk->voteResponse.term, wp->propTerm); } Assert(sk->voteResponse.term == wp->propTerm); @@ -877,10 +877,10 @@ HandleElectedProposer(WalProposer *wp) if (wp->truncateLsn < wp->propEpochStartLsn) { walprop_log(LOG, - "start recovery because truncateLsn=%X/%X is not " - "equal to epochStartLsn=%X/%X", - LSN_FORMAT_ARGS(wp->truncateLsn), - LSN_FORMAT_ARGS(wp->propEpochStartLsn)); + "start recovery because truncateLsn=%X/%X is not " + "equal to epochStartLsn=%X/%X", + LSN_FORMAT_ARGS(wp->truncateLsn), + LSN_FORMAT_ARGS(wp->propEpochStartLsn)); /* Perform recovery */ if (!wp->api.recovery_download(&wp->safekeeper[wp->donor], wp->greetRequest.timeline, wp->truncateLsn, wp->propEpochStartLsn)) walprop_log(FATAL, "Failed to recover state"); @@ -990,9 +990,9 @@ DetermineEpochStartLsn(WalProposer *wp) wp->timelineStartLsn != wp->safekeeper[i].voteResponse.timelineStartLsn) { walprop_log(WARNING, - "inconsistent timelineStartLsn: current %X/%X, received %X/%X", - LSN_FORMAT_ARGS(wp->timelineStartLsn), - LSN_FORMAT_ARGS(wp->safekeeper[i].voteResponse.timelineStartLsn)); + "inconsistent timelineStartLsn: current %X/%X, received %X/%X", + LSN_FORMAT_ARGS(wp->timelineStartLsn), + LSN_FORMAT_ARGS(wp->safekeeper[i].voteResponse.timelineStartLsn)); } wp->timelineStartLsn = wp->safekeeper[i].voteResponse.timelineStartLsn; } @@ -1038,11 +1038,11 @@ DetermineEpochStartLsn(WalProposer *wp) wp->propTermHistory.entries[wp->propTermHistory.n_entries - 1].lsn = wp->propEpochStartLsn; walprop_log(LOG, "got votes from majority (%d) of nodes, term " UINT64_FORMAT ", epochStartLsn %X/%X, donor %s:%s, truncate_lsn %X/%X", - wp->quorum, - wp->propTerm, - LSN_FORMAT_ARGS(wp->propEpochStartLsn), - wp->safekeeper[wp->donor].host, wp->safekeeper[wp->donor].port, - LSN_FORMAT_ARGS(wp->truncateLsn)); + wp->quorum, + wp->propTerm, + LSN_FORMAT_ARGS(wp->propEpochStartLsn), + wp->safekeeper[wp->donor].host, wp->safekeeper[wp->donor].port, + LSN_FORMAT_ARGS(wp->truncateLsn)); /* * Ensure the basebackup we are running (at RedoStartLsn) matches LSN @@ -1070,18 +1070,18 @@ DetermineEpochStartLsn(WalProposer *wp) walprop_shared->mineLastElectedTerm))) { walprop_log(PANIC, - "collected propEpochStartLsn %X/%X, but basebackup LSN %X/%X", - LSN_FORMAT_ARGS(wp->propEpochStartLsn), - LSN_FORMAT_ARGS(wp->api.get_redo_start_lsn(wp))); + "collected propEpochStartLsn %X/%X, but basebackup LSN %X/%X", + LSN_FORMAT_ARGS(wp->propEpochStartLsn), + LSN_FORMAT_ARGS(wp->api.get_redo_start_lsn(wp))); } } walprop_shared->mineLastElectedTerm = wp->propTerm; } /* - * WalProposer has just elected itself and initialized history, so - * we can call election callback. Usually it updates truncateLsn to - * fetch WAL for logical replication. + * WalProposer has just elected itself and initialized history, so we can + * call election callback. Usually it updates truncateLsn to fetch WAL for + * logical replication. */ wp->api.after_election(wp); } @@ -1155,8 +1155,8 @@ SendProposerElected(Safekeeper *sk) sk->startStreamingAt = wp->truncateLsn; walprop_log(WARNING, "empty safekeeper joined cluster as %s:%s, historyStart=%X/%X, sk->startStreamingAt=%X/%X", - sk->host, sk->port, LSN_FORMAT_ARGS(wp->propTermHistory.entries[0].lsn), - LSN_FORMAT_ARGS(sk->startStreamingAt)); + sk->host, sk->port, LSN_FORMAT_ARGS(wp->propTermHistory.entries[0].lsn), + LSN_FORMAT_ARGS(sk->startStreamingAt)); } } else @@ -1190,8 +1190,8 @@ SendProposerElected(Safekeeper *sk) lastCommonTerm = i >= 0 ? wp->propTermHistory.entries[i].term : 0; walprop_log(LOG, - "sending elected msg to node " UINT64_FORMAT " term=" UINT64_FORMAT ", startStreamingAt=%X/%X (lastCommonTerm=" UINT64_FORMAT "), termHistory.n_entries=%u to %s:%s, timelineStartLsn=%X/%X", - sk->greetResponse.nodeId, msg.term, LSN_FORMAT_ARGS(msg.startStreamingAt), lastCommonTerm, msg.termHistory->n_entries, sk->host, sk->port, LSN_FORMAT_ARGS(msg.timelineStartLsn)); + "sending elected msg to node " UINT64_FORMAT " term=" UINT64_FORMAT ", startStreamingAt=%X/%X (lastCommonTerm=" UINT64_FORMAT "), termHistory.n_entries=%u to %s:%s, timelineStartLsn=%X/%X", + sk->greetResponse.nodeId, msg.term, LSN_FORMAT_ARGS(msg.startStreamingAt), lastCommonTerm, msg.termHistory->n_entries, sk->host, sk->port, LSN_FORMAT_ARGS(msg.timelineStartLsn)); resetStringInfo(&sk->outbuf); pq_sendint64_le(&sk->outbuf, msg.tag); @@ -1355,11 +1355,11 @@ SendAppendRequests(Safekeeper *sk) PrepareAppendRequest(sk->wp, &sk->appendRequest, sk->streamingAt, endLsn); walprop_log(DEBUG2, "sending message len %ld beginLsn=%X/%X endLsn=%X/%X commitLsn=%X/%X truncateLsn=%X/%X to %s:%s", - req->endLsn - req->beginLsn, - LSN_FORMAT_ARGS(req->beginLsn), - LSN_FORMAT_ARGS(req->endLsn), - LSN_FORMAT_ARGS(req->commitLsn), - LSN_FORMAT_ARGS(wp->truncateLsn), sk->host, sk->port); + req->endLsn - req->beginLsn, + LSN_FORMAT_ARGS(req->beginLsn), + LSN_FORMAT_ARGS(req->endLsn), + LSN_FORMAT_ARGS(req->commitLsn), + LSN_FORMAT_ARGS(wp->truncateLsn), sk->host, sk->port); resetStringInfo(&sk->outbuf); @@ -1398,8 +1398,8 @@ SendAppendRequests(Safekeeper *sk) case PG_ASYNC_WRITE_FAIL: walprop_log(WARNING, "Failed to send to node %s:%s in %s state: %s", - sk->host, sk->port, FormatSafekeeperState(sk->state), - wp->api.conn_error_message(sk)); + sk->host, sk->port, FormatSafekeeperState(sk->state), + wp->api.conn_error_message(sk)); ShutdownConnection(sk); return false; default: @@ -1438,17 +1438,17 @@ RecvAppendResponses(Safekeeper *sk) break; walprop_log(DEBUG2, "received message term=" INT64_FORMAT " flushLsn=%X/%X commitLsn=%X/%X from %s:%s", - sk->appendResponse.term, - LSN_FORMAT_ARGS(sk->appendResponse.flushLsn), - LSN_FORMAT_ARGS(sk->appendResponse.commitLsn), - sk->host, sk->port); + sk->appendResponse.term, + LSN_FORMAT_ARGS(sk->appendResponse.flushLsn), + LSN_FORMAT_ARGS(sk->appendResponse.commitLsn), + sk->host, sk->port); if (sk->appendResponse.term > wp->propTerm) { /* Another compute with higher term is running. */ walprop_log(PANIC, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT "", - sk->host, sk->port, - sk->appendResponse.term, wp->propTerm); + sk->host, sk->port, + sk->appendResponse.term, wp->propTerm); } readAnything = true; @@ -1493,7 +1493,7 @@ ParsePageserverFeedbackMessage(WalProposer *wp, StringInfo reply_message, Pagese /* read value length */ rf->currentClusterSize = pq_getmsgint64(reply_message); walprop_log(DEBUG2, "ParsePageserverFeedbackMessage: current_timeline_size %lu", - rf->currentClusterSize); + rf->currentClusterSize); } else if ((strcmp(key, "ps_writelsn") == 0) || (strcmp(key, "last_received_lsn") == 0)) { @@ -1501,7 +1501,7 @@ ParsePageserverFeedbackMessage(WalProposer *wp, StringInfo reply_message, Pagese /* read value length */ rf->last_received_lsn = pq_getmsgint64(reply_message); walprop_log(DEBUG2, "ParsePageserverFeedbackMessage: last_received_lsn %X/%X", - LSN_FORMAT_ARGS(rf->last_received_lsn)); + LSN_FORMAT_ARGS(rf->last_received_lsn)); } else if ((strcmp(key, "ps_flushlsn") == 0) || (strcmp(key, "disk_consistent_lsn") == 0)) { @@ -1509,7 +1509,7 @@ ParsePageserverFeedbackMessage(WalProposer *wp, StringInfo reply_message, Pagese /* read value length */ rf->disk_consistent_lsn = pq_getmsgint64(reply_message); walprop_log(DEBUG2, "ParsePageserverFeedbackMessage: disk_consistent_lsn %X/%X", - LSN_FORMAT_ARGS(rf->disk_consistent_lsn)); + LSN_FORMAT_ARGS(rf->disk_consistent_lsn)); } else if ((strcmp(key, "ps_applylsn") == 0) || (strcmp(key, "remote_consistent_lsn") == 0)) { @@ -1517,7 +1517,7 @@ ParsePageserverFeedbackMessage(WalProposer *wp, StringInfo reply_message, Pagese /* read value length */ rf->remote_consistent_lsn = pq_getmsgint64(reply_message); walprop_log(DEBUG2, "ParsePageserverFeedbackMessage: remote_consistent_lsn %X/%X", - LSN_FORMAT_ARGS(rf->remote_consistent_lsn)); + LSN_FORMAT_ARGS(rf->remote_consistent_lsn)); } else if ((strcmp(key, "ps_replytime") == 0) || (strcmp(key, "replytime") == 0)) { @@ -1530,7 +1530,7 @@ ParsePageserverFeedbackMessage(WalProposer *wp, StringInfo reply_message, Pagese /* Copy because timestamptz_to_str returns a static buffer */ replyTimeStr = pstrdup(timestamptz_to_str(rf->replytime)); walprop_log(DEBUG2, "ParsePageserverFeedbackMessage: replytime %lu reply_time: %s", - rf->replytime, replyTimeStr); + rf->replytime, replyTimeStr); pfree(replyTimeStr); } @@ -1700,8 +1700,8 @@ AsyncRead(Safekeeper *sk, char **buf, int *buf_size) case PG_ASYNC_READ_FAIL: walprop_log(WARNING, "Failed to read from node %s:%s in %s state: %s", sk->host, - sk->port, FormatSafekeeperState(sk->state), - wp->api.conn_error_message(sk)); + sk->port, FormatSafekeeperState(sk->state), + wp->api.conn_error_message(sk)); ShutdownConnection(sk); return false; } @@ -1740,7 +1740,7 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg) if (tag != anymsg->tag) { walprop_log(WARNING, "unexpected message tag %c from node %s:%s in state %s", (char) tag, sk->host, - sk->port, FormatSafekeeperState(sk->state)); + sk->port, FormatSafekeeperState(sk->state)); ResetConnection(sk); return false; } @@ -1816,8 +1816,8 @@ BlockingWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState succes if (!wp->api.conn_blocking_write(sk, msg, msg_size)) { walprop_log(WARNING, "Failed to send to node %s:%s in %s state: %s", - sk->host, sk->port, FormatSafekeeperState(sk->state), - wp->api.conn_error_message(sk)); + sk->host, sk->port, FormatSafekeeperState(sk->state), + wp->api.conn_error_message(sk)); ShutdownConnection(sk); return false; } @@ -1863,8 +1863,8 @@ AsyncWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState flush_sta return false; case PG_ASYNC_WRITE_FAIL: walprop_log(WARNING, "Failed to send to node %s:%s in %s state: %s", - sk->host, sk->port, FormatSafekeeperState(sk->state), - wp->api.conn_error_message(sk)); + sk->host, sk->port, FormatSafekeeperState(sk->state), + wp->api.conn_error_message(sk)); ShutdownConnection(sk); return false; default: @@ -1902,8 +1902,8 @@ AsyncFlush(Safekeeper *sk) return false; case -1: walprop_log(WARNING, "Failed to flush write to node %s:%s in %s state: %s", - sk->host, sk->port, FormatSafekeeperState(sk->state), - wp->api.conn_error_message(sk)); + sk->host, sk->port, FormatSafekeeperState(sk->state), + wp->api.conn_error_message(sk)); ResetConnection(sk); return false; default: @@ -2008,7 +2008,7 @@ AssertEventsOkForState(uint32 events, Safekeeper *sk) * and then an assertion that's guaranteed to fail. */ walprop_log(WARNING, "events %s mismatched for safekeeper %s:%s in state [%s]", - FormatEvents(wp, events), sk->host, sk->port, FormatSafekeeperState(sk->state)); + FormatEvents(wp, events), sk->host, sk->port, FormatSafekeeperState(sk->state)); Assert(events_ok_for_state); } } @@ -2111,7 +2111,7 @@ FormatEvents(WalProposer *wp, uint32 events) if (events & (~all_flags)) { walprop_log(WARNING, "Event formatting found unexpected component %d", - events & (~all_flags)); + events & (~all_flags)); return_str[6] = '*'; return_str[7] = '\0'; } diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index 664aeedfa7..ae7812e710 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -356,7 +356,8 @@ typedef struct Safekeeper /* postgres-specific fields */ - #ifndef WALPROPOSER_LIB +#ifndef WALPROPOSER_LIB + /* * postgres protocol connection to the WAL acceptor * @@ -374,17 +375,18 @@ typedef struct Safekeeper * Position in wait event set. Equal to -1 if no event */ int eventPos; - #endif +#endif /* WalProposer library specifics */ - #ifdef WALPROPOSER_LIB +#ifdef WALPROPOSER_LIB + /* * Buffer for incoming messages. Usually Rust vector is stored here. * Caller is responsible for freeing the buffer. */ StringInfoData inbuf; - #endif +#endif } Safekeeper; /* Re-exported PostgresPollingStatusType */ @@ -472,7 +474,7 @@ typedef struct walproposer_api WalProposerConnStatusType (*conn_status) (Safekeeper *sk); /* Start the connection, aka PQconnectStart. */ - void (*conn_connect_start) (Safekeeper *sk); + void (*conn_connect_start) (Safekeeper *sk); /* Poll an asynchronous connection, aka PQconnectPoll. */ WalProposerConnectPollStatusType (*conn_connect_poll) (Safekeeper *sk); @@ -490,7 +492,7 @@ typedef struct walproposer_api void (*conn_finish) (Safekeeper *sk); /* - * Try to read CopyData message from the safekeeper, aka PQgetCopyData. + * Try to read CopyData message from the safekeeper, aka PQgetCopyData. * * On success, the data is placed in *buf. It is valid until the next call * to this function. @@ -510,7 +512,7 @@ typedef struct walproposer_api void (*wal_read) (Safekeeper *sk, char *buf, XLogRecPtr startptr, Size count); /* Allocate WAL reader. */ - void (*wal_reader_allocate) (Safekeeper *sk); + void (*wal_reader_allocate) (Safekeeper *sk); /* Deallocate event set. */ void (*free_event_set) (WalProposer *wp); @@ -572,7 +574,7 @@ typedef struct walproposer_api /* * Called right after the proposer was elected, but before it started * recovery and sent ProposerElected message to the safekeepers. - * + * * Used by logical replication to update truncateLsn. */ void (*after_election) (WalProposer *wp); @@ -626,10 +628,10 @@ typedef struct WalProposerConfig uint64 systemId; /* Will be passed to safekeepers in greet request. */ - TimeLineID pgTimeline; + TimeLineID pgTimeline; #ifdef WALPROPOSER_LIB - void *callback_data; + void *callback_data; #endif } WalProposerConfig; @@ -710,10 +712,11 @@ extern void WalProposerPoll(WalProposer *wp); extern void WalProposerFree(WalProposer *wp); -#define WPEVENT 1337 /* special log level for walproposer internal events */ +#define WPEVENT 1337 /* special log level for walproposer internal + * events */ #ifdef WALPROPOSER_LIB -void WalProposerLibLog(WalProposer *wp, int elevel, char *fmt, ...); +void WalProposerLibLog(WalProposer *wp, int elevel, char *fmt,...); #define walprop_log(elevel, ...) WalProposerLibLog(wp, elevel, __VA_ARGS__) #else #define walprop_log(elevel, ...) elog(elevel, __VA_ARGS__) diff --git a/pgxn/neon/walproposer_compat.c b/pgxn/neon/walproposer_compat.c index 7617f21a26..04b519ab15 100644 --- a/pgxn/neon/walproposer_compat.c +++ b/pgxn/neon/walproposer_compat.c @@ -9,8 +9,9 @@ #include "utils/datetime.h" #include "miscadmin.h" -void ExceptionalCondition(const char *conditionName, - const char *fileName, int lineNumber) +void +ExceptionalCondition(const char *conditionName, + const char *fileName, int lineNumber) { fprintf(stderr, "ExceptionalCondition: %s:%d: %s\n", fileName, lineNumber, conditionName); @@ -169,17 +170,18 @@ timestamptz_to_str(TimestampTz t) bool TimestampDifferenceExceeds(TimestampTz start_time, - TimestampTz stop_time, - int msec) + TimestampTz stop_time, + int msec) { TimestampTz diff = stop_time - start_time; + return (diff >= msec * INT64CONST(1000)); } void -WalProposerLibLog(WalProposer *wp, int elevel, char *fmt, ...) +WalProposerLibLog(WalProposer *wp, int elevel, char *fmt,...) { - char buf[1024]; + char buf[1024]; va_list args; fmt = _(fmt); diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index f83a08d407..551d56d416 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -637,8 +637,8 @@ walprop_connect_start(Safekeeper *sk) */ sk->conn = palloc(sizeof(WalProposerConn)); sk->conn->pg_conn = pg_conn; - sk->conn->is_nonblocking = false; /* connections always start in blocking - * mode */ + sk->conn->is_nonblocking = false; /* connections always start in + * blocking mode */ sk->conn->recvbuf = NULL; } @@ -1291,10 +1291,11 @@ XLogWalPropWrite(WalProposer *wp, char *buf, Size nbytes, XLogRecPtr recptr) /* * Apart from walproposer, basebackup LSN page is also written out by * postgres itself which writes WAL only in pages, and in basebackup it is - * inherently dummy (only safekeepers have historic WAL). Update WAL buffers - * here to avoid dummy page overwriting correct one we download here. Ugly, - * but alternatives are about the same ugly. We won't need that if we switch - * to on-demand WAL download from safekeepers, without writing to disk. + * inherently dummy (only safekeepers have historic WAL). Update WAL + * buffers here to avoid dummy page overwriting correct one we download + * here. Ugly, but alternatives are about the same ugly. We won't need + * that if we switch to on-demand WAL download from safekeepers, without + * writing to disk. * * https://github.com/neondatabase/neon/issues/5749 */ @@ -1681,17 +1682,17 @@ walprop_pg_log_internal(WalProposer *wp, int level, const char *line) static void walprop_pg_after_election(WalProposer *wp) { - FILE* f; - XLogRecPtr lrRestartLsn; + FILE *f; + XLogRecPtr lrRestartLsn; - /* We don't need to do anything in syncSafekeepers mode.*/ + /* We don't need to do anything in syncSafekeepers mode. */ if (wp->config->syncSafekeepers) return; /* - * If there are active logical replication subscription we need - * to provide enough WAL for their WAL senders based on th position - * of their replication slots. + * If there are active logical replication subscription we need to provide + * enough WAL for their WAL senders based on th position of their + * replication slots. */ f = fopen("restart.lsn", "rb"); if (f != NULL && !wp->config->syncSafekeepers) @@ -1700,8 +1701,12 @@ walprop_pg_after_election(WalProposer *wp) fclose(f); if (lrRestartLsn != InvalidXLogRecPtr) { - elog(LOG, "Logical replication restart LSN %X/%X", LSN_FORMAT_ARGS(lrRestartLsn)); - /* start from the beginning of the segment to fetch page headers verifed by XLogReader */ + elog(LOG, "Logical replication restart LSN %X/%X", LSN_FORMAT_ARGS(lrRestartLsn)); + + /* + * start from the beginning of the segment to fetch page headers + * verifed by XLogReader + */ lrRestartLsn = lrRestartLsn - XLogSegmentOffset(lrRestartLsn, wal_segment_size); wp->truncateLsn = Min(wp->truncateLsn, lrRestartLsn); }