From cc138b56f983c8fc66e737c5e02898121fdd72ec Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Thu, 19 Dec 2024 04:45:06 -0500 Subject: [PATCH] fix(pageserver): run psql in thread to avoid blocking (#10177) ## Problem ref https://github.com/neondatabase/neon/issues/10170 ref https://github.com/neondatabase/neon/issues/9994 The psql command will block the main thread, causing other async tasks to timeout (i.e., HTTP connect). Therefore, we need to move it to an I/O executor thread. ## Summary of changes * run psql connection in a thread --------- Signed-off-by: Alex Chi Z Co-authored-by: John Spray --- .../regress/test_pageserver_layer_rolling.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/test_runner/regress/test_pageserver_layer_rolling.py b/test_runner/regress/test_pageserver_layer_rolling.py index 706da1e35e..fcc465f90a 100644 --- a/test_runner/regress/test_pageserver_layer_rolling.py +++ b/test_runner/regress/test_pageserver_layer_rolling.py @@ -22,7 +22,10 @@ CHECKPOINT_TIMEOUT_SECONDS = 60 async def run_worker_for_tenant( - env: NeonEnv, entries: int, tenant: TenantId, offset: int | None = None + env: NeonEnv, + entries: int, + tenant: TenantId, + offset: int | None = None, ) -> Lsn: if offset is None: offset = 0 @@ -37,12 +40,20 @@ async def run_worker_for_tenant( finally: await conn.close(timeout=10) - last_flush_lsn = Lsn(ep.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0]) + loop = asyncio.get_running_loop() + sql = await loop.run_in_executor( + None, lambda ep: ep.safe_psql("SELECT pg_current_wal_flush_lsn()"), ep + ) + last_flush_lsn = Lsn(sql[0][0]) return last_flush_lsn async def run_worker(env: NeonEnv, tenant_conf, entries: int) -> tuple[TenantId, TimelineId, Lsn]: - tenant, timeline = env.create_tenant(conf=tenant_conf) + loop = asyncio.get_running_loop() + # capture tenant_conf by specifying `tenant_conf=tenant_conf`, otherwise it will be evaluated to some random value + tenant, timeline = await loop.run_in_executor( + None, lambda tenant_conf, env: env.create_tenant(conf=tenant_conf), tenant_conf, env + ) last_flush_lsn = await run_worker_for_tenant(env, entries, tenant) return tenant, timeline, last_flush_lsn