diff --git a/test_runner/regress/test_tenant_detach.py b/test_runner/regress/test_tenant_detach.py index 6963a57542..4484915c7f 100644 --- a/test_runner/regress/test_tenant_detach.py +++ b/test_runner/regress/test_tenant_detach.py @@ -1,9 +1,13 @@ +import asyncio +import random import time from threading import Thread +import asyncpg import pytest from fixtures.log_helper import log from fixtures.neon_fixtures import ( + NeonEnv, NeonEnvBuilder, PageserverApiException, PageserverHttpClient, @@ -84,6 +88,94 @@ def test_tenant_reattach( assert env.pageserver.log_contains(".*download.*failed, will retry.*") +num_connections = 10 +num_rows = 100000 +updates_to_perform = 100000 + +updates_performed = 0 + + +# Run random UPDATEs on test table. On failure, try again. +async def update_table(pg: Postgres): + global updates_performed + pg_conn = await pg.connect_async() + + while updates_performed < updates_to_perform: + id = random.randrange(1, num_rows) + try: + await pg_conn.fetchrow(f"UPDATE t SET counter = counter + 1 WHERE id = {id}") + updates_performed += 1 + if updates_performed % 100 == 0: + log.info(f"update {updates_performed} / {updates_to_perform}") + except asyncpg.PostgresError as e: + # Received error from Postgres. Log it, sleep a little, and continue + log.info(f"UPDATE error: {e}") + await asyncio.sleep(0.1) + + +async def sleep_and_reattach(pageserver_http: PageserverHttpClient, tenant_id: TenantId): + await asyncio.sleep(3) + log.info("Detaching tenant") + pageserver_http.tenant_detach(tenant_id) + await asyncio.sleep(1) + log.info("Re-attaching tenant") + pageserver_http.tenant_attach(tenant_id) + log.info("Re-attach finished") + + +# async guts of test_tenant_reattach_while_bysy test +async def reattach_while_busy( + env: NeonEnv, pg: Postgres, pageserver_http: PageserverHttpClient, tenant_id: TenantId +): + workers = [] + for worker_id in range(num_connections): + workers.append(asyncio.create_task(update_table(pg))) + + workers.append(asyncio.create_task(sleep_and_reattach(pageserver_http, tenant_id))) + await asyncio.gather(*workers) + + assert updates_performed == updates_to_perform + + +# Detach and re-attach tenant, while compute is busy running queries. +# +# Some of the queries may fail, in the window tha the tenant has been detached but +# not yet re-attached. But Postgres itself should keep running, and when we just +# retry the queries, they should start working after the attach has finished. +@pytest.mark.parametrize("remote_storage_kind", available_remote_storages()) +def test_tenant_reattach_while_busy( + neon_env_builder: NeonEnvBuilder, + remote_storage_kind: RemoteStorageKind, +): + neon_env_builder.enable_remote_storage( + remote_storage_kind=remote_storage_kind, + test_name="test_tenant_reattach_while_busy", + ) + + env = neon_env_builder.init_start() + pageserver_http = env.pageserver.http_client() + + # create new nenant + tenant_id, timeline_id = env.neon_cli.create_tenant( + # Create layers aggressively + conf={"checkpoint_distance": "100000"} + ) + + pg = env.postgres.create_start("main", tenant_id=tenant_id) + + cur = pg.connect().cursor() + + cur.execute("CREATE TABLE t(id int primary key, counter int)") + cur.execute(f"INSERT INTO t SELECT generate_series(1,{num_rows}), 0") + + # Run the test + asyncio.run(reattach_while_busy(env, pg, pageserver_http, tenant_id)) + + # Verify table contents + assert query_scalar(cur, "SELECT count(*) FROM t") == num_rows + assert query_scalar(cur, "SELECT sum(counter) FROM t") == updates_to_perform + + def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder): env = neon_env_builder.init_start() pageserver_http = env.pageserver.http_client()