tests: use semaphore instead of lock for Endpoint.running (#8112)

## Problem

Ahem, let's try this again.

https://github.com/neondatabase/neon/pull/8110 had a spooky failure in
test_multi_attach where a call to Endpoint.stop() timed out waiting for
a lock, even though we can see an earlier call completing and releasing
the lock. I suspect something weird is going on with the way pytest runs
tests across processes, or use of asyncio perhaps.

Anyway: the simplest fix is to just use a semaphore instead: if we don't
lock we can't deadlock.

## Summary of changes

- Make Endpoint.running a semaphore, where we add a unit to its counter
when starting the process and atomically decrement it when stopping.
This commit is contained in:
John Spray
2024-06-19 17:07:14 +01:00
committed by GitHub
parent fd0b22f5cd
commit f0e2bb79b2

View File

@@ -3446,11 +3446,12 @@ class Endpoint(PgProtocol, LogUtils):
self.active_safekeepers: List[int] = list(map(lambda sk: sk.id, env.safekeepers))
# path to conf is <repo_dir>/endpoints/<endpoint_id>/pgdata/postgresql.conf
# This lock prevents concurrent start & stop operations, keeping `self.running` consistent
# with whether we're really running. Tests generally wouldn't try and do these concurrently,
# but endpoints are also stopped during test teardown, which might happen concurrently with
# destruction of objects in tests.
self.lock = threading.Lock()
# Semaphore is set to 1 when we start, and acquire'd back to zero when we stop
#
# We use a semaphore rather than a bool so that racing calls to stop() don't
# try and stop the same process twice, as stop() is called by test teardown and
# potentially by some __del__ chains in other threads.
self._running = threading.Semaphore(0)
def http_client(
self, auth_token: Optional[str] = None, retries: Optional[Retry] = None
@@ -3522,15 +3523,14 @@ class Endpoint(PgProtocol, LogUtils):
log.info(f"Starting postgres endpoint {self.endpoint_id}")
with self.lock:
self.env.neon_cli.endpoint_start(
self.endpoint_id,
safekeepers=self.active_safekeepers,
remote_ext_config=remote_ext_config,
pageserver_id=pageserver_id,
allow_multiple=allow_multiple,
)
self.running = True
self.env.neon_cli.endpoint_start(
self.endpoint_id,
safekeepers=self.active_safekeepers,
remote_ext_config=remote_ext_config,
pageserver_id=pageserver_id,
allow_multiple=allow_multiple,
)
self._running.release(1)
return self
@@ -3578,9 +3578,12 @@ class Endpoint(PgProtocol, LogUtils):
conf_file.write("\n".join(hba) + "\n")
conf_file.write(data)
if self.running:
if self.is_running():
self.safe_psql("SELECT pg_reload_conf()")
def is_running(self):
return self._running._value > 0
def reconfigure(self, pageserver_id: Optional[int] = None):
assert self.endpoint_id is not None
self.env.neon_cli.endpoint_reconfigure(self.endpoint_id, self.tenant_id, pageserver_id)
@@ -3629,13 +3632,12 @@ class Endpoint(PgProtocol, LogUtils):
Returns self.
"""
with self.lock:
if self.running:
assert self.endpoint_id is not None
self.env.neon_cli.endpoint_stop(
self.endpoint_id, check_return_code=self.check_stop_result, mode=mode
)
self.running = False
running = self._running.acquire(blocking=False)
if running:
assert self.endpoint_id is not None
self.env.neon_cli.endpoint_stop(
self.endpoint_id, check_return_code=self.check_stop_result, mode=mode
)
return self
@@ -3645,13 +3647,13 @@ class Endpoint(PgProtocol, LogUtils):
Returns self.
"""
with self.lock:
running = self._running.acquire(blocking=False)
if running:
assert self.endpoint_id is not None
self.env.neon_cli.endpoint_stop(
self.endpoint_id, True, check_return_code=self.check_stop_result, mode=mode
)
self.endpoint_id = None
self.running = False
return self