Compare commits

...

3 Commits

Author SHA1 Message Date
Sasha Krassovsky
6c632f490f Switch lwlock to pthread_rwlock_t 2023-12-05 11:54:13 -08:00
Sasha Krassovsky
2a9ffa4686 fmt 2023-12-05 11:54:13 -08:00
Sasha Krassovsky
3afe386d86 Don't disconnect if pagerserver didn't change 2023-12-05 11:54:13 -08:00
2 changed files with 34 additions and 13 deletions

View File

@@ -19,7 +19,6 @@
#include "access/xlog.h"
#include "access/xlogutils.h"
#include "storage/buf_internals.h"
#include "storage/lwlock.h"
#include "storage/ipc.h"
#include "storage/pg_shmem.h"
#include "c.h"
@@ -37,6 +36,8 @@
#include "walproposer.h"
#include "neon_utils.h"
#include <pthread.h>
#define PageStoreTrace DEBUG5
#define RECONNECT_INTERVAL_USEC 1000000
@@ -69,7 +70,7 @@ int max_reconnect_attempts = 60;
typedef struct
{
LWLockId lock;
pthread_rwlock_t lock;
pg_atomic_uint64 update_counter;
char pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE];
} PagestoreShmemState;
@@ -105,10 +106,10 @@ AssignPageserverConnstring(const char *newval, void *extra)
{
if(!PagestoreShmemIsValid())
return;
LWLockAcquire(pagestore_shared->lock, LW_EXCLUSIVE);
pthread_rwlock_wrlock(&pagestore_shared->lock);
strlcpy(pagestore_shared->pageserver_connstring, newval, MAX_PAGESERVER_CONNSTRING_SIZE);
pg_atomic_fetch_add_u64(&pagestore_shared->update_counter, 1);
LWLockRelease(pagestore_shared->lock);
pthread_rwlock_unlock(&pagestore_shared->lock);
}
static bool
@@ -119,15 +120,24 @@ CheckConnstringUpdated()
return pagestore_local_counter < pg_atomic_read_u64(&pagestore_shared->update_counter);
}
static void
/* Returns true if the connstring has changed and false if not */
static bool
ReloadConnstring()
{
if(!PagestoreShmemIsValid())
return;
LWLockAcquire(pagestore_shared->lock, LW_SHARED);
return false;
pthread_rwlock_rdlock(&pagestore_shared->lock);
if(strcmp(local_pageserver_connstring, pagestore_shared->pageserver_connstring) == 0)
{
pthread_rwlock_unlock(&pagestore_shared->lock);
return false;
}
strlcpy(local_pageserver_connstring, pagestore_shared->pageserver_connstring, sizeof(local_pageserver_connstring));
pagestore_local_counter = pg_atomic_read_u64(&pagestore_shared->update_counter);
LWLockRelease(pagestore_shared->lock);
pthread_rwlock_unlock(&pagestore_shared->lock);
return true;
}
static bool
@@ -290,7 +300,6 @@ pageserver_disconnect(void)
*/
if (connected)
{
neon_log(LOG, "dropping connection to page server due to error");
PQfinish(pageserver_conn);
pageserver_conn = NULL;
connected = false;
@@ -311,8 +320,12 @@ pageserver_send(NeonRequest * request)
if(CheckConnstringUpdated())
{
pageserver_disconnect();
ReloadConnstring();
bool should_disconnect = ReloadConnstring();
if(should_disconnect)
{
neon_log(LOG, "pageserver_send disconnect because connstring changed");
pageserver_disconnect();
}
}
/* If the connection was lost for some reason, reconnect */
@@ -484,7 +497,12 @@ PagestoreShmemInit(void)
&found);
if(!found)
{
pagestore_shared->lock = &(GetNamedLWLockTranche("neon_libpagestore")->lock);
pthread_rwlockattr_t attr;
pthread_rwlockattr_init(&attr);
pthread_rwlockattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
pthread_rwlock_init(&pagestore_shared->lock, &attr);
pthread_rwlockattr_destroy(&attr);
pg_atomic_init_u64(&pagestore_shared->update_counter, 0);
AssignPageserverConnstring(page_server_connstring, NULL);
}

View File

@@ -60,7 +60,10 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
execute("SELECT count(*) FROM foo")
assert fetchone() == (100000,)
endpoint.reconfigure(pageserver_id=alt_pageserver_id)
# Reconfigure it using the same connstring just to make sure nothing breaks
# as we have special handling for if the connstring doesn't change
for _ in range(5):
endpoint.reconfigure(pageserver_id=alt_pageserver_id)
# Verify that the neon.pageserver_connstring GUC is set to the correct thing
execute("SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'")