mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-12 07:52:55 +00:00
tests: use semaphore instead of lock for Endpoint.running (#8112)
## Problem Ahem, let's try this again. https://github.com/neondatabase/neon/pull/8110 had a spooky failure in test_multi_attach where a call to Endpoint.stop() timed out waiting for a lock, even though we can see an earlier call completing and releasing the lock. I suspect something weird is going on with the way pytest runs tests across processes, or use of asyncio perhaps. Anyway: the simplest fix is to just use a semaphore instead: if we don't lock we can't deadlock. ## Summary of changes - Make Endpoint.running a semaphore, where we add a unit to its counter when starting the process and atomically decrement it when stopping.
This commit is contained in:
@@ -3446,11 +3446,12 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
self.active_safekeepers: List[int] = list(map(lambda sk: sk.id, env.safekeepers))
|
||||
# path to conf is <repo_dir>/endpoints/<endpoint_id>/pgdata/postgresql.conf
|
||||
|
||||
# This lock prevents concurrent start & stop operations, keeping `self.running` consistent
|
||||
# with whether we're really running. Tests generally wouldn't try and do these concurrently,
|
||||
# but endpoints are also stopped during test teardown, which might happen concurrently with
|
||||
# destruction of objects in tests.
|
||||
self.lock = threading.Lock()
|
||||
# Semaphore is set to 1 when we start, and acquire'd back to zero when we stop
|
||||
#
|
||||
# We use a semaphore rather than a bool so that racing calls to stop() don't
|
||||
# try and stop the same process twice, as stop() is called by test teardown and
|
||||
# potentially by some __del__ chains in other threads.
|
||||
self._running = threading.Semaphore(0)
|
||||
|
||||
def http_client(
|
||||
self, auth_token: Optional[str] = None, retries: Optional[Retry] = None
|
||||
@@ -3522,15 +3523,14 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
|
||||
log.info(f"Starting postgres endpoint {self.endpoint_id}")
|
||||
|
||||
with self.lock:
|
||||
self.env.neon_cli.endpoint_start(
|
||||
self.endpoint_id,
|
||||
safekeepers=self.active_safekeepers,
|
||||
remote_ext_config=remote_ext_config,
|
||||
pageserver_id=pageserver_id,
|
||||
allow_multiple=allow_multiple,
|
||||
)
|
||||
self.running = True
|
||||
self.env.neon_cli.endpoint_start(
|
||||
self.endpoint_id,
|
||||
safekeepers=self.active_safekeepers,
|
||||
remote_ext_config=remote_ext_config,
|
||||
pageserver_id=pageserver_id,
|
||||
allow_multiple=allow_multiple,
|
||||
)
|
||||
self._running.release(1)
|
||||
|
||||
return self
|
||||
|
||||
@@ -3578,9 +3578,12 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
conf_file.write("\n".join(hba) + "\n")
|
||||
conf_file.write(data)
|
||||
|
||||
if self.running:
|
||||
if self.is_running():
|
||||
self.safe_psql("SELECT pg_reload_conf()")
|
||||
|
||||
def is_running(self):
|
||||
return self._running._value > 0
|
||||
|
||||
def reconfigure(self, pageserver_id: Optional[int] = None):
|
||||
assert self.endpoint_id is not None
|
||||
self.env.neon_cli.endpoint_reconfigure(self.endpoint_id, self.tenant_id, pageserver_id)
|
||||
@@ -3629,13 +3632,12 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
Returns self.
|
||||
"""
|
||||
|
||||
with self.lock:
|
||||
if self.running:
|
||||
assert self.endpoint_id is not None
|
||||
self.env.neon_cli.endpoint_stop(
|
||||
self.endpoint_id, check_return_code=self.check_stop_result, mode=mode
|
||||
)
|
||||
self.running = False
|
||||
running = self._running.acquire(blocking=False)
|
||||
if running:
|
||||
assert self.endpoint_id is not None
|
||||
self.env.neon_cli.endpoint_stop(
|
||||
self.endpoint_id, check_return_code=self.check_stop_result, mode=mode
|
||||
)
|
||||
|
||||
return self
|
||||
|
||||
@@ -3645,13 +3647,13 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
Returns self.
|
||||
"""
|
||||
|
||||
with self.lock:
|
||||
running = self._running.acquire(blocking=False)
|
||||
if running:
|
||||
assert self.endpoint_id is not None
|
||||
self.env.neon_cli.endpoint_stop(
|
||||
self.endpoint_id, True, check_return_code=self.check_stop_result, mode=mode
|
||||
)
|
||||
self.endpoint_id = None
|
||||
self.running = False
|
||||
|
||||
return self
|
||||
|
||||
|
||||
Reference in New Issue
Block a user