diff --git a/pytest.ini b/pytest.ini index da9ab8c12f..104d0e0244 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,4 +1,7 @@ [pytest] +filterwarnings = + error::pytest.PytestUnhandledThreadExceptionWarning + error::UserWarning addopts = -m 'not remote_cluster' markers = diff --git a/test_runner/batch_others/test_branch_and_gc.py b/test_runner/batch_others/test_branch_and_gc.py index 76a77357ae..8e433f65ad 100644 --- a/test_runner/batch_others/test_branch_and_gc.py +++ b/test_runner/batch_others/test_branch_and_gc.py @@ -167,3 +167,5 @@ def test_branch_creation_before_gc(neon_simple_env: NeonEnv): # The starting LSN is invalid as the corresponding record is scheduled to be removed by in-queue GC. with pytest.raises(Exception, match="invalid branch start lsn"): env.neon_cli.create_branch('b1', 'b0', tenant_id=tenant, ancestor_start_lsn=lsn) + + thread.join() diff --git a/test_runner/batch_others/test_pageserver_api.py b/test_runner/batch_others/test_pageserver_api.py index 95791888a5..51df41699a 100644 --- a/test_runner/batch_others/test_pageserver_api.py +++ b/test_runner/batch_others/test_pageserver_api.py @@ -60,17 +60,38 @@ def check_client(client: NeonPageserverHttpClient, initial_tenant: UUID): def test_pageserver_http_get_wal_receiver_not_found(neon_simple_env: NeonEnv): env = neon_simple_env - client = env.pageserver.http_client() + with env.pageserver.http_client() as client: + tenant_id, timeline_id = env.neon_cli.create_tenant() - tenant_id, timeline_id = env.neon_cli.create_tenant() + timeline_details = client.timeline_detail(tenant_id=tenant_id, + timeline_id=timeline_id, + include_non_incremental_logical_size=True) - timeline_details = client.timeline_detail(tenant_id=tenant_id, - timeline_id=timeline_id, - include_non_incremental_logical_size=True) + assert timeline_details.get('wal_source_connstr') is None, 'Should not be able to connect to WAL streaming without PG compute node running' + assert timeline_details.get('last_received_msg_lsn') is None, 'Should not be able to connect to WAL streaming without PG compute node running' + assert timeline_details.get('last_received_msg_ts') is None, 'Should not be able to connect to WAL streaming without PG compute node running' - assert timeline_details.get('wal_source_connstr') is None, 'Should not be able to connect to WAL streaming without PG compute node running' - assert timeline_details.get('last_received_msg_lsn') is None, 'Should not be able to connect to WAL streaming without PG compute node running' - assert timeline_details.get('last_received_msg_ts') is None, 'Should not be able to connect to WAL streaming without PG compute node running' + +def expect_updated_msg_lsn(client: NeonPageserverHttpClient, + tenant_id: UUID, + timeline_id: UUID, + prev_msg_lsn: Optional[int]) -> int: + timeline_details = client.timeline_detail(tenant_id, timeline_id=timeline_id) + + # a successful `timeline_details` response must contain the below fields + local_timeline_details = timeline_details['local'] + assert "wal_source_connstr" in local_timeline_details.keys() + assert "last_received_msg_lsn" in local_timeline_details.keys() + assert "last_received_msg_ts" in local_timeline_details.keys() + + assert local_timeline_details["last_received_msg_lsn"] is not None, "the last received message's LSN is empty" + + last_msg_lsn = lsn_from_hex(local_timeline_details["last_received_msg_lsn"]) + assert prev_msg_lsn is None or prev_msg_lsn < last_msg_lsn, \ + f"the last received message's LSN {last_msg_lsn} hasn't been updated \ + compared to the previous message's LSN {prev_msg_lsn}" + + return last_msg_lsn # Test the WAL-receiver related fields in the response to `timeline_details` API call @@ -79,44 +100,29 @@ def test_pageserver_http_get_wal_receiver_not_found(neon_simple_env: NeonEnv): # `timeline_details` now. def test_pageserver_http_get_wal_receiver_success(neon_simple_env: NeonEnv): env = neon_simple_env - client = env.pageserver.http_client() + with env.pageserver.http_client() as client: + tenant_id, timeline_id = env.neon_cli.create_tenant() + pg = env.postgres.create_start(DEFAULT_BRANCH_NAME, tenant_id=tenant_id) - tenant_id, timeline_id = env.neon_cli.create_tenant() - pg = env.postgres.create_start(DEFAULT_BRANCH_NAME, tenant_id=tenant_id) + # Wait to make sure that we get a latest WAL receiver data. + # We need to wait here because it's possible that we don't have access to + # the latest WAL yet, when the `timeline_detail` API is first called. + # See: https://github.com/neondatabase/neon/issues/1768. + lsn = wait_until(number_of_iterations=5, + interval=1, + func=lambda: expect_updated_msg_lsn(client, tenant_id, timeline_id, None)) - def expect_updated_msg_lsn(prev_msg_lsn: Optional[int]) -> int: - timeline_details = client.timeline_detail(tenant_id, timeline_id=timeline_id) - - # a successful `timeline_details` response must contain the below fields - local_timeline_details = timeline_details['local'] - assert "wal_source_connstr" in local_timeline_details.keys() - assert "last_received_msg_lsn" in local_timeline_details.keys() - assert "last_received_msg_ts" in local_timeline_details.keys() - - assert local_timeline_details["last_received_msg_lsn"] is not None, "the last received message's LSN is empty" - - last_msg_lsn = lsn_from_hex(local_timeline_details["last_received_msg_lsn"]) - assert prev_msg_lsn is None or prev_msg_lsn < last_msg_lsn, \ - f"the last received message's LSN {last_msg_lsn} hasn't been updated \ - compared to the previous message's LSN {prev_msg_lsn}" - - return last_msg_lsn - - # Wait to make sure that we get a latest WAL receiver data. - # We need to wait here because it's possible that we don't have access to - # the latest WAL yet, when the `timeline_detail` API is first called. - # See: https://github.com/neondatabase/neon/issues/1768. - lsn = wait_until(number_of_iterations=5, interval=1, func=lambda: expect_updated_msg_lsn(None)) - - # Make a DB modification then expect getting a new WAL receiver's data. - pg.safe_psql("CREATE TABLE t(key int primary key, value text)") - wait_until(number_of_iterations=5, interval=1, func=lambda: expect_updated_msg_lsn(lsn)) + # Make a DB modification then expect getting a new WAL receiver's data. + pg.safe_psql("CREATE TABLE t(key int primary key, value text)") + wait_until(number_of_iterations=5, + interval=1, + func=lambda: expect_updated_msg_lsn(client, tenant_id, timeline_id, lsn)) def test_pageserver_http_api_client(neon_simple_env: NeonEnv): env = neon_simple_env - client = env.pageserver.http_client() - check_client(client, env.initial_tenant) + with env.pageserver.http_client() as client: + check_client(client, env.initial_tenant) def test_pageserver_http_api_client_auth_enabled(neon_env_builder: NeonEnvBuilder): @@ -125,5 +131,5 @@ def test_pageserver_http_api_client_auth_enabled(neon_env_builder: NeonEnvBuilde management_token = env.auth_keys.generate_management_token() - client = env.pageserver.http_client(auth_token=management_token) - check_client(client, env.initial_tenant) + with env.pageserver.http_client(auth_token=management_token) as client: + check_client(client, env.initial_tenant) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 9b39bf2b39..3848aee05a 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -222,7 +222,7 @@ def can_bind(host: str, port: int) -> bool: # moment. If that changes, we should use start using SO_REUSEADDR here # too, to allow reusing ports more quickly. # See https://github.com/neondatabase/neon/issues/801 - #sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + # sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: sock.bind((host, port)) @@ -231,6 +231,8 @@ def can_bind(host: str, port: int) -> bool: except socket.error: log.info(f"Port {port} is in use, skipping") return False + finally: + sock.close() class PortDistributor: @@ -2022,8 +2024,8 @@ class Safekeeper: started_at = time.time() while True: try: - http_cli = self.http_client() - http_cli.check_status() + with self.http_client() as http_cli: + http_cli.check_status() except Exception as e: elapsed = time.time() - started_at if elapsed > 3: @@ -2174,9 +2176,9 @@ class Etcd: return f'http://127.0.0.1:{self.port}' def check_status(self): - s = requests.Session() - s.mount('http://', requests.adapters.HTTPAdapter(max_retries=1)) # do not retry - s.get(f"{self.client_url()}/health").raise_for_status() + with requests.Session() as s: + s.mount('http://', requests.adapters.HTTPAdapter(max_retries=1)) # do not retry + s.get(f"{self.client_url()}/health").raise_for_status() def try_start(self): if self.handle is not None: