Compare commits

...

5 Commits

Author SHA1 Message Date
Bojan Serafimov
2eb2620656 Add query cancel tests 2022-02-22 13:49:25 -05:00
Bojan Serafimov
99db785d5a Simplify 2022-02-22 12:59:49 -05:00
Bojan Serafimov
d32085cf1b Simplify 2022-02-22 12:48:46 -05:00
Bojan Serafimov
2a6efe4a48 Add TODOs 2022-02-22 11:30:00 -05:00
Bojan Serafimov
51a455d348 WIP proxy python test 2022-02-22 10:55:11 -05:00
2 changed files with 112 additions and 1 deletions

View File

@@ -0,0 +1,54 @@
import pytest
import subprocess
import signal
import time
def test_proxy_select_1(static_proxy):
static_proxy.safe_psql("select 1;")
def test_proxy_cancel(static_proxy):
"""Test that we can cancel a big generate_series query."""
conn = static_proxy.connect()
conn.cancel()
with conn.cursor() as cur:
from psycopg2.errors import QueryCanceled
with pytest.raises(QueryCanceled):
cur.execute("select * from generate_series(1, 100000000);")
def test_proxy_pgbench_cancel(static_proxy, pg_bin):
"""Test that we can cancel the init phase of pgbench."""
start_time = static_proxy.safe_psql("select now();")[0]
def get_running_queries():
magic_string = "fsdsdfhdfhfgbcbfgbfgbf"
with static_proxy.connect() as conn:
with conn.cursor() as cur:
cur.execute(f"""
-- {magic_string}
select query
from pg_stat_activity
where pg_stat_activity.query_start > %s
""", start_time)
return [
row[0]
for row in cur.fetchall()
if not magic_string in row[0]
]
# Let pgbench init run for 1 second
p = subprocess.Popen(['pgbench', '-s500', '-i', static_proxy.connstr()])
time.sleep(1)
# Make sure something is still running, and that get_running_queries works
assert len(get_running_queries()) > 0
# Send sigint, which would cancel any pgbench queries
p.send_signal(signal.SIGINT)
# Assert that nothing is running
time.sleep(1)
assert len(get_running_queries()) == 0

View File

@@ -237,10 +237,13 @@ def port_distributor(worker_base_port):
class PgProtocol:
""" Reusable connection logic """
def __init__(self, host: str, port: int, username: Optional[str] = None):
def __init__(self, host: str, port: int,
username: Optional[str] = None,
password: Optional[str] = None):
self.host = host
self.port = port
self.username = username
self.password = password
def connstr(self,
*,
@@ -252,6 +255,7 @@ class PgProtocol:
"""
username = username or self.username
password = password or self.password
res = f'host={self.host} port={self.port} dbname={dbname}'
if username:
@@ -1157,6 +1161,59 @@ def vanilla_pg(test_output_dir: str) -> Iterator[VanillaPostgres]:
yield vanilla_pg
class ZenithProxy(PgProtocol):
def __init__(self, port: int):
super().__init__(host="127.0.0.1", username="pytest", password="pytest", port=port)
self.running = False
def start_static(self, addr="127.0.0.1:5432") -> None:
assert not self.running
self.running = True
http_port = "7001"
args = [
# TODO is cargo run the right thing to do?
"cargo",
"run",
"--bin", "proxy",
"--",
"--http", http_port,
"--proxy", f"{self.host}:{self.port}",
"--auth-method", "password",
"--static-router", addr,
]
self.popen = subprocess.Popen(args)
# Readiness probe
requests.get(f"http://{self.host}:{http_port}/v1/status")
def stop(self) -> None:
assert self.running
self.running = False
# NOTE the process will die when we're done with tests anyway, because
# it's a child process. This is mostly to clean up in between different tests.
self.popen.kill()
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
if self.running:
self.stop()
@pytest.fixture(scope='function')
def static_proxy(vanilla_pg) -> Iterator[ZenithProxy]:
vanilla_pg.start()
vanilla_pg.safe_psql("create user pytest with password 'pytest';")
with ZenithProxy(4432) as proxy:
proxy.start_static()
yield proxy
class Postgres(PgProtocol):
""" An object representing a running postgres daemon. """
def __init__(self, env: ZenithEnv, tenant_id: uuid.UUID, port: int):