Revert "extend test_change_pageserver for failure case, rework changing pageserver (#5693)"

This reverts commit b989ad1922.
This commit is contained in:
John Spray
2023-11-09 16:43:10 +00:00
parent 733877a8ff
commit 27e9cb91ed
3 changed files with 29 additions and 200 deletions

View File

@@ -710,12 +710,8 @@ impl ComputeNode {
// `pg_ctl` for start / stop, so this just seems much easier to do as we already
// have opened connection to Postgres and superuser access.
#[instrument(skip_all)]
fn pg_reload_conf(&self) -> Result<()> {
let pgctl_bin = Path::new(&self.pgbin).parent().unwrap().join("pg_ctl");
Command::new(pgctl_bin)
.args(["reload", "-D", &self.pgdata])
.output()
.expect("cannot run pg_ctl process");
fn pg_reload_conf(&self, client: &mut Client) -> Result<()> {
client.simple_query("SELECT pg_reload_conf()")?;
Ok(())
}
@@ -728,9 +724,9 @@ impl ComputeNode {
// Write new config
let pgdata_path = Path::new(&self.pgdata);
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec, None)?;
self.pg_reload_conf()?;
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
self.pg_reload_conf(&mut client)?;
// Proceed with post-startup configuration. Note, that order of operations is important.
// Disable DDL forwarding because control plane already knows about these roles/databases.

View File

@@ -19,10 +19,7 @@
#include "access/xlog.h"
#include "access/xlogutils.h"
#include "storage/buf_internals.h"
#include "storage/lwlock.h"
#include "storage/ipc.h"
#include "c.h"
#include "postmaster/interrupt.h"
#include "libpq-fe.h"
#include "libpq/pqformat.h"
@@ -64,63 +61,23 @@ int flush_every_n_requests = 8;
int n_reconnect_attempts = 0;
int max_reconnect_attempts = 60;
#define MAX_PAGESERVER_CONNSTRING_SIZE 256
typedef struct
{
LWLockId lock;
pg_atomic_uint64 update_counter;
char pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE];
} PagestoreShmemState;
#if PG_VERSION_NUM >= 150000
static shmem_request_hook_type prev_shmem_request_hook = NULL;
static void walproposer_shmem_request(void);
#endif
static shmem_startup_hook_type prev_shmem_startup_hook;
static PagestoreShmemState *pagestore_shared;
static uint64 pagestore_local_counter = 0;
static char local_pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE];
bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL;
static bool pageserver_flush(void);
static void pageserver_disconnect(void);
static bool
CheckPageserverConnstring(char **newval, void **extra, GucSource source)
{
return strlen(*newval) < MAX_PAGESERVER_CONNSTRING_SIZE;
}
static pqsigfunc prev_signal_handler;
static void
AssignPageserverConnstring(const char *newval, void *extra)
pageserver_sighup_handler(SIGNAL_ARGS)
{
if(!pagestore_shared)
return;
LWLockAcquire(pagestore_shared->lock, LW_EXCLUSIVE);
strlcpy(pagestore_shared->pageserver_connstring, newval, MAX_PAGESERVER_CONNSTRING_SIZE);
pg_atomic_fetch_add_u64(&pagestore_shared->update_counter, 1);
LWLockRelease(pagestore_shared->lock);
}
static bool
CheckConnstringUpdated()
{
if(!pagestore_shared)
return false;
return pagestore_local_counter < pg_atomic_read_u64(&pagestore_shared->update_counter);
}
static void
ReloadConnstring()
{
if(!pagestore_shared)
return;
LWLockAcquire(pagestore_shared->lock, LW_SHARED);
strlcpy(local_pageserver_connstring, pagestore_shared->pageserver_connstring, sizeof(local_pageserver_connstring));
pagestore_local_counter = pg_atomic_read_u64(&pagestore_shared->update_counter);
LWLockRelease(pagestore_shared->lock);
if (prev_signal_handler)
{
prev_signal_handler(postgres_signal_arg);
}
neon_log(LOG, "Received SIGHUP, disconnecting pageserver. New pageserver connstring is %s", page_server_connstring);
pageserver_disconnect();
}
static bool
@@ -134,11 +91,6 @@ pageserver_connect(int elevel)
Assert(!connected);
if(CheckConnstringUpdated())
{
ReloadConnstring();
}
/*
* Connect using the connection string we got from the
* neon.pageserver_connstring GUC. If the NEON_AUTH_TOKEN environment
@@ -158,7 +110,7 @@ pageserver_connect(int elevel)
n++;
}
keywords[n] = "dbname";
values[n] = local_pageserver_connstring;
values[n] = page_server_connstring;
n++;
keywords[n] = NULL;
values[n] = NULL;
@@ -302,12 +254,6 @@ pageserver_send(NeonRequest * request)
{
StringInfoData req_buff;
if(CheckConnstringUpdated())
{
pageserver_disconnect();
ReloadConnstring();
}
/* If the connection was lost for some reason, reconnect */
if (connected && PQstatus(pageserver_conn) == CONNECTION_BAD)
{
@@ -328,7 +274,6 @@ pageserver_send(NeonRequest * request)
{
while (!pageserver_connect(n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR))
{
HandleMainLoopInterrupts();
n_reconnect_attempts += 1;
pg_usleep(RECONNECT_INTERVAL_USEC);
}
@@ -446,8 +391,7 @@ pageserver_flush(void)
return true;
}
page_server_api api =
{
page_server_api api = {
.send = pageserver_send,
.flush = pageserver_flush,
.receive = pageserver_receive
@@ -461,72 +405,12 @@ check_neon_id(char **newval, void **extra, GucSource source)
return **newval == '\0' || HexDecodeString(id, *newval, 16);
}
static Size
PagestoreShmemSize(void)
{
return sizeof(PagestoreShmemState);
}
static bool
PagestoreShmemInit(void)
{
bool found;
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
pagestore_shared = ShmemInitStruct("libpagestore shared state",
PagestoreShmemSize(),
&found);
if(!found)
{
pagestore_shared->lock = &(GetNamedLWLockTranche("neon_libpagestore")->lock);
pg_atomic_init_u64(&pagestore_shared->update_counter, 0);
AssignPageserverConnstring(page_server_connstring, NULL);
}
LWLockRelease(AddinShmemInitLock);
return found;
}
static void
pagestore_shmem_startup_hook(void)
{
if(prev_shmem_startup_hook)
prev_shmem_startup_hook();
PagestoreShmemInit();
}
static void
pagestore_shmem_request(void)
{
#if PG_VERSION_NUM >= 150000
if(prev_shmem_request_hook)
prev_shmem_request_hook();
#endif
RequestAddinShmemSpace(PagestoreShmemSize());
RequestNamedLWLockTranche("neon_libpagestore", 1);
}
static void
pagestore_prepare_shmem(void)
{
#if PG_VERSION_NUM >= 150000
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = pagestore_shmem_request;
#else
pagestore_shmem_request();
#endif
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = pagestore_shmem_startup_hook;
}
/*
* Module initialization function
*/
void
pg_init_libpagestore(void)
{
pagestore_prepare_shmem();
DefineCustomStringVariable("neon.pageserver_connstring",
"connection string to the page server",
NULL,
@@ -534,7 +418,7 @@ pg_init_libpagestore(void)
"",
PGC_SIGHUP,
0, /* no flags required */
CheckPageserverConnstring, AssignPageserverConnstring, NULL);
NULL, NULL, NULL);
DefineCustomStringVariable("neon.timeline_id",
"Neon timeline_id the server is running on",
@@ -615,5 +499,7 @@ pg_init_libpagestore(void)
redo_read_buffer_filter = neon_redo_read_buffer_filter;
}
prev_signal_handler = pqsignal(SIGHUP, pageserver_sighup_handler);
lfc_init();
}

View File

@@ -1,13 +1,9 @@
import asyncio
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.remote_storage import RemoteStorageKind
def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
num_connections = 3
neon_env_builder.num_pageservers = 2
neon_env_builder.enable_pageserver_remote_storage(
remote_storage_kind=RemoteStorageKind.MOCK_S3,
@@ -20,24 +16,15 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
alt_pageserver_id = env.pageservers[1].id
env.pageservers[1].tenant_attach(env.initial_tenant)
pg_conns = [endpoint.connect() for i in range(num_connections)]
curs = [pg_conn.cursor() for pg_conn in pg_conns]
def execute(statement: str):
for cur in curs:
cur.execute(statement)
def fetchone():
results = [cur.fetchone() for cur in curs]
assert all(result == results[0] for result in results)
return results[0]
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
# Create table, and insert some rows. Make it big enough that it doesn't fit in
# shared_buffers, otherwise the SELECT after restart will just return answer
# from shared_buffers without hitting the page server, which defeats the point
# of this test.
curs[0].execute("CREATE TABLE foo (t text)")
curs[0].execute(
cur.execute("CREATE TABLE foo (t text)")
cur.execute(
"""
INSERT INTO foo
SELECT 'long string to consume some space' || g
@@ -46,25 +33,25 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
)
# Verify that the table is larger than shared_buffers
curs[0].execute(
cur.execute(
"""
select setting::int * pg_size_bytes(unit) as shared_buffers, pg_relation_size('foo') as tbl_size
from pg_settings where name = 'shared_buffers'
"""
)
row = curs[0].fetchone()
row = cur.fetchone()
assert row is not None
log.info(f"shared_buffers is {row[0]}, table size {row[1]}")
assert int(row[0]) < int(row[1])
execute("SELECT count(*) FROM foo")
assert fetchone() == (100000,)
cur.execute("SELECT count(*) FROM foo")
assert cur.fetchone() == (100000,)
endpoint.reconfigure(pageserver_id=alt_pageserver_id)
# Verify that the neon.pageserver_connstring GUC is set to the correct thing
execute("SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'")
connstring = fetchone()
cur.execute("SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'")
connstring = cur.fetchone()
assert connstring is not None
expected_connstring = f"postgresql://no_user:@localhost:{env.pageservers[1].service_port.pg}"
assert expected_connstring == expected_connstring
@@ -73,45 +60,5 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
0
].stop() # Stop the old pageserver just to make sure we're reading from the new one
execute("SELECT count(*) FROM foo")
assert fetchone() == (100000,)
# Try failing back, and this time we will stop the current pageserver before reconfiguring
# the endpoint. Whereas the previous reconfiguration was like a healthy migration, this
# is more like what happens in an unexpected pageserver failure.
env.pageservers[0].start()
env.pageservers[1].stop()
endpoint.reconfigure(pageserver_id=env.pageservers[0].id)
execute("SELECT count(*) FROM foo")
assert fetchone() == (100000,)
env.pageservers[0].stop()
env.pageservers[1].start()
# Test a (former) bug where a child process spins without updating its connection string
# by executing a query separately. This query will hang until we issue the reconfigure.
async def reconfigure_async():
await asyncio.sleep(
1
) # Sleep for 1 second just to make sure we actually started our count(*) query
endpoint.reconfigure(pageserver_id=env.pageservers[1].id)
def execute_count():
execute("SELECT count(*) FROM FOO")
async def execute_and_reconfigure():
task_exec = asyncio.to_thread(execute_count)
task_reconfig = asyncio.create_task(reconfigure_async())
await asyncio.gather(
task_exec,
task_reconfig,
)
asyncio.run(execute_and_reconfigure())
assert fetchone() == (100000,)
# One final check that nothing hangs
execute("SELECT count(*) FROM foo")
assert fetchone() == (100000,)
cur.execute("SELECT count(*) FROM foo")
assert cur.fetchone() == (100000,)