import random import time from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnvBuilder def test_compute_pageserver_connection_stress(neon_env_builder: NeonEnvBuilder): env = neon_env_builder.init_start() env.pageserver.allowed_errors.append(".*simulated connection error.*") pageserver_http = env.pageserver.http_client() env.neon_cli.create_branch("test_compute_pageserver_connection_stress") endpoint = env.endpoints.create_start("test_compute_pageserver_connection_stress") # Enable failpoint after starting everything else up so that loading initial # basebackup doesn't fail pageserver_http.configure_failpoints(("simulated-bad-compute-connection", "50%return(15)")) pg_conn = endpoint.connect() cur = pg_conn.cursor() # Create table, and insert some rows. Make it big enough that it doesn't fit in # shared_buffers, otherwise the SELECT after restart will just return answer # from shared_buffers without hitting the page server, which defeats the point # of this test. cur.execute("CREATE TABLE foo (t text)") cur.execute( """ INSERT INTO foo SELECT 'long string to consume some space' || g FROM generate_series(1, 100000) g """ ) # Verify that the table is larger than shared_buffers cur.execute( """ select setting::int * pg_size_bytes(unit) as shared_buffers, pg_relation_size('foo') as tbl_size from pg_settings where name = 'shared_buffers' """ ) row = cur.fetchone() assert row is not None log.info(f"shared_buffers is {row[0]}, table size {row[1]}") assert int(row[0]) < int(row[1]) cur.execute("SELECT count(*) FROM foo") assert cur.fetchone() == (100000,) end_time = time.time() + 30 times_executed = 0 while time.time() < end_time: if random.random() < 0.5: cur.execute("INSERT INTO foo VALUES ('stas'), ('heikki')") else: cur.execute("SELECT t FROM foo ORDER BY RANDOM() LIMIT 10") cur.fetchall() times_executed += 1 log.info(f"Workload executed {times_executed} times")