mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 20:42:54 +00:00
Extract PostgreSQL connection logic into PgProtocol
This patch aims to:
* Unify connection & querying logic of ZenithPagerserver and Postgres.
* Mitigate changes to transaction machinery introduced in `psycopg2 >= 2.9`.
Now it's possible to acquire db connection using the corresponding
method:
```python
pg = postgres.create_start('main')
conn = pg.connect()
...
conn.close()
```
This pattern can be further improved with the help of `closing`:
```python
from contextlib import closing
pg = postgres.create_start('main')
with closing(pg.connect()) as conn:
...
```
All connections produced by this method will have autocommit
enabled by default.
This commit is contained in:
@@ -1,5 +1,3 @@
|
||||
import psycopg2
|
||||
|
||||
pytest_plugins = ("fixtures.zenith_fixtures")
|
||||
|
||||
|
||||
@@ -13,8 +11,7 @@ def test_branch_behind(zenith_cli, pageserver, postgres, pg_bin):
|
||||
pgmain = postgres.create_start('test_branch_behind')
|
||||
print("postgres is running on 'test_branch_behind' branch")
|
||||
|
||||
main_pg_conn = psycopg2.connect(pgmain.connstr())
|
||||
main_pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
main_pg_conn = pgmain.connect()
|
||||
main_cur = main_pg_conn.cursor()
|
||||
|
||||
# Create table, and insert the first 100 rows
|
||||
@@ -60,15 +57,13 @@ def test_branch_behind(zenith_cli, pageserver, postgres, pg_bin):
|
||||
pg_more = postgres.create_start("test_branch_behind_more")
|
||||
|
||||
# On the 'hundred' branch, we should see only 100 rows
|
||||
hundred_pg_conn = psycopg2.connect(pg_hundred.connstr())
|
||||
hundred_pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
hundred_pg_conn = pg_hundred.connect()
|
||||
hundred_cur = hundred_pg_conn.cursor()
|
||||
hundred_cur.execute('SELECT count(*) FROM foo')
|
||||
assert hundred_cur.fetchone() == (100, )
|
||||
|
||||
# On the 'more' branch, we should see 100200 rows
|
||||
more_pg_conn = psycopg2.connect(pg_more.connstr())
|
||||
more_pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
more_pg_conn = pg_more.connect()
|
||||
more_cur = more_pg_conn.cursor()
|
||||
more_cur.execute('SELECT count(*) FROM foo')
|
||||
assert more_cur.fetchone() == (100100, )
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import psycopg2
|
||||
from contextlib import closing
|
||||
|
||||
pytest_plugins = ("fixtures.zenith_fixtures")
|
||||
|
||||
@@ -14,9 +14,7 @@ def test_config(zenith_cli, pageserver, postgres, pg_bin):
|
||||
pg = postgres.create_start('test_config', config_lines=['log_min_messages=debug1'])
|
||||
print('postgres is running on test_config branch')
|
||||
|
||||
with psycopg2.connect(pg.connstr()) as conn:
|
||||
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute('''
|
||||
SELECT setting
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import psycopg2
|
||||
from contextlib import closing
|
||||
|
||||
pytest_plugins = ("fixtures.zenith_fixtures")
|
||||
|
||||
@@ -12,9 +12,7 @@ def test_createdb(zenith_cli, pageserver, postgres, pg_bin):
|
||||
pg = postgres.create_start('test_createdb')
|
||||
print("postgres is running on 'test_createdb' branch")
|
||||
|
||||
with psycopg2.connect(pg.connstr()) as conn:
|
||||
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# Cause a 'relmapper' change in the original branch
|
||||
cur.execute('VACUUM FULL pg_class')
|
||||
@@ -31,4 +29,4 @@ def test_createdb(zenith_cli, pageserver, postgres, pg_bin):
|
||||
|
||||
# Test that you can connect to the new database on both branches
|
||||
for db in (pg, pg2):
|
||||
psycopg2.connect(db.connstr('foodb')).close()
|
||||
db.connect(dbname='foodb').close()
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import psycopg2
|
||||
from contextlib import closing
|
||||
|
||||
pytest_plugins = ("fixtures.zenith_fixtures")
|
||||
|
||||
@@ -12,9 +12,7 @@ def test_createuser(zenith_cli, pageserver, postgres, pg_bin):
|
||||
pg = postgres.create_start('test_createuser')
|
||||
print("postgres is running on 'test_createuser' branch")
|
||||
|
||||
with psycopg2.connect(pg.connstr()) as conn:
|
||||
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# Cause a 'relmapper' change in the original branch
|
||||
cur.execute('CREATE USER testuser with password %s', ('testpwd', ))
|
||||
@@ -30,7 +28,4 @@ def test_createuser(zenith_cli, pageserver, postgres, pg_bin):
|
||||
pg2 = postgres.create_start('test_createuser2')
|
||||
|
||||
# Test that you can connect to new branch as a new user
|
||||
conn2 = psycopg2.connect(pg2.connstr(username='testuser'))
|
||||
with conn2.cursor() as cur:
|
||||
cur.execute('select current_user;')
|
||||
assert cur.fetchone() == ('testuser', )
|
||||
assert pg2.safe_psql('select current_user', username='testuser') == [('testuser', )]
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
import psycopg2
|
||||
|
||||
pytest_plugins = ("fixtures.zenith_fixtures")
|
||||
|
||||
|
||||
@@ -15,8 +13,7 @@ def test_multixact(pageserver, postgres, pg_bin, zenith_cli, base_dir):
|
||||
pg = postgres.create_start('test_multixact')
|
||||
|
||||
print("postgres is running on 'test_multixact' branch")
|
||||
pg_conn = psycopg2.connect(pg.connstr())
|
||||
pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
pg_conn = pg.connect()
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
cur.execute('''
|
||||
@@ -31,10 +28,10 @@ def test_multixact(pageserver, postgres, pg_bin, zenith_cli, base_dir):
|
||||
nclients = 3
|
||||
connections = []
|
||||
for i in range(nclients):
|
||||
con = psycopg2.connect(pg.connstr())
|
||||
# Do not turn on autocommit. We want to hold the key-share locks.
|
||||
con.cursor().execute('select * from t1 for key share')
|
||||
connections.append(con)
|
||||
conn = pg.connect(autocommit=False)
|
||||
conn.cursor().execute('select * from t1 for key share')
|
||||
connections.append(conn)
|
||||
|
||||
# We should have a multixact now. We can close the connections.
|
||||
for c in connections:
|
||||
@@ -56,8 +53,7 @@ def test_multixact(pageserver, postgres, pg_bin, zenith_cli, base_dir):
|
||||
pg_new = postgres.create_start('test_multixact_new')
|
||||
|
||||
print("postgres is running on 'test_multixact_new' branch")
|
||||
pg_new_conn = psycopg2.connect(pg_new.connstr())
|
||||
pg_new_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
pg_new_conn = pg_new.connect()
|
||||
cur_new = pg_new_conn.cursor()
|
||||
|
||||
cur_new.execute('SELECT next_multixact_id FROM pg_control_checkpoint()')
|
||||
|
||||
@@ -1,28 +1,23 @@
|
||||
import psycopg2
|
||||
import json
|
||||
|
||||
pytest_plugins = ("fixtures.zenith_fixtures")
|
||||
|
||||
|
||||
def test_status(pageserver):
|
||||
pg_conn = psycopg2.connect(pageserver.connstr())
|
||||
pg_conn.autocommit = True
|
||||
cur = pg_conn.cursor()
|
||||
cur.execute('status')
|
||||
assert cur.fetchone() == ('hello world', )
|
||||
pg_conn.close()
|
||||
assert pageserver.safe_psql('status') == [
|
||||
('hello world', ),
|
||||
]
|
||||
|
||||
|
||||
def test_branch_list(pageserver, zenith_cli):
|
||||
# Create a branch for us
|
||||
zenith_cli.run(["branch", "test_branch_list_main", "empty"])
|
||||
|
||||
page_server_conn = psycopg2.connect(pageserver.connstr())
|
||||
page_server_conn.autocommit = True
|
||||
page_server_cur = page_server_conn.cursor()
|
||||
conn = pageserver.connect()
|
||||
cur = conn.cursor()
|
||||
|
||||
page_server_cur.execute('branch_list')
|
||||
branches = json.loads(page_server_cur.fetchone()[0])
|
||||
cur.execute('branch_list')
|
||||
branches = json.loads(cur.fetchone()[0])
|
||||
# Filter out branches created by other tests
|
||||
branches = [x for x in branches if x['name'].startswith('test_branch_list')]
|
||||
|
||||
@@ -37,8 +32,8 @@ def test_branch_list(pageserver, zenith_cli):
|
||||
zenith_cli.run(['branch', 'test_branch_list_experimental', 'test_branch_list_main'])
|
||||
zenith_cli.run(['pg', 'create', 'test_branch_list_experimental'])
|
||||
|
||||
page_server_cur.execute('branch_list')
|
||||
new_branches = json.loads(page_server_cur.fetchone()[0])
|
||||
cur.execute('branch_list')
|
||||
new_branches = json.loads(cur.fetchone()[0])
|
||||
# Filter out branches created by other tests
|
||||
new_branches = [x for x in new_branches if x['name'].startswith('test_branch_list')]
|
||||
assert len(new_branches) == 2
|
||||
@@ -50,4 +45,4 @@ def test_branch_list(pageserver, zenith_cli):
|
||||
# TODO: do the LSNs have to match here?
|
||||
assert new_branches[1] == branches[0]
|
||||
|
||||
page_server_conn.close()
|
||||
conn.close()
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import psycopg2
|
||||
from contextlib import closing
|
||||
|
||||
pytest_plugins = ("fixtures.zenith_fixtures")
|
||||
|
||||
@@ -12,39 +12,31 @@ def test_restart_compute(zenith_cli, pageserver, postgres, pg_bin):
|
||||
pg = postgres.create_start('test_restart_compute')
|
||||
print("postgres is running on 'test_restart_compute' branch")
|
||||
|
||||
pg_conn = psycopg2.connect(pg.connstr())
|
||||
pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
# Create table, and insert a row
|
||||
cur.execute('CREATE TABLE foo (t text)')
|
||||
cur.execute("INSERT INTO foo VALUES ('bar')")
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# Create table, and insert a row
|
||||
cur.execute('CREATE TABLE foo (t text)')
|
||||
cur.execute("INSERT INTO foo VALUES ('bar')")
|
||||
|
||||
# Stop and restart the Postgres instance
|
||||
pg_conn.close()
|
||||
pg.stop()
|
||||
pg.start()
|
||||
pg_conn = psycopg2.connect(pg.connstr())
|
||||
pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
cur = pg_conn.cursor()
|
||||
pg.stop().start()
|
||||
|
||||
# We can still see the row
|
||||
cur.execute('SELECT count(*) FROM foo')
|
||||
assert cur.fetchone() == (1, )
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# We can still see the row
|
||||
cur.execute('SELECT count(*) FROM foo')
|
||||
assert cur.fetchone() == (1, )
|
||||
|
||||
# Insert another row
|
||||
cur.execute("INSERT INTO foo VALUES ('bar2')")
|
||||
cur.execute('SELECT count(*) FROM foo')
|
||||
assert cur.fetchone() == (2, )
|
||||
# Insert another row
|
||||
cur.execute("INSERT INTO foo VALUES ('bar2')")
|
||||
cur.execute('SELECT count(*) FROM foo')
|
||||
assert cur.fetchone() == (2, )
|
||||
|
||||
# Stop, and destroy the Postgres instance. Then recreate and restart it.
|
||||
pg_conn.close()
|
||||
pg.stop_and_destroy()
|
||||
pg.create_start('test_restart_compute')
|
||||
pg_conn = psycopg2.connect(pg.connstr())
|
||||
pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
cur = pg_conn.cursor()
|
||||
pg.stop_and_destroy().create_start('test_restart_compute')
|
||||
|
||||
# We can still see the rows
|
||||
cur.execute('SELECT count(*) FROM foo')
|
||||
assert cur.fetchone() == (2, )
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# We can still see the rows
|
||||
cur.execute('SELECT count(*) FROM foo')
|
||||
assert cur.fetchone() == (2, )
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
import psycopg2
|
||||
|
||||
pytest_plugins = ("fixtures.zenith_fixtures")
|
||||
|
||||
|
||||
@@ -12,8 +10,7 @@ def test_twophase(zenith_cli, pageserver, postgres, pg_bin):
|
||||
pg = postgres.create_start('test_twophase', config_lines=['max_prepared_transactions=5'])
|
||||
print("postgres is running on 'test_twophase' branch")
|
||||
|
||||
conn = psycopg2.connect(pg.connstr())
|
||||
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
conn = pg.connect()
|
||||
cur = conn.cursor()
|
||||
|
||||
cur.execute('CREATE TABLE foo (t text)')
|
||||
@@ -33,8 +30,7 @@ def test_twophase(zenith_cli, pageserver, postgres, pg_bin):
|
||||
|
||||
pg2 = postgres.create_start('test_twophase_prepared',
|
||||
config_lines=['max_prepared_transactions=5'])
|
||||
conn2 = psycopg2.connect(pg2.connstr())
|
||||
conn2.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
conn2 = pg2.connect()
|
||||
cur2 = conn2.cursor()
|
||||
|
||||
# On the new branch, commit one of the prepared transactions, abort the other one.
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
import psycopg2
|
||||
import pytest
|
||||
import random
|
||||
import time
|
||||
|
||||
from contextlib import closing
|
||||
from multiprocessing import Process, Value
|
||||
|
||||
pytest_plugins = ("fixtures.zenith_fixtures")
|
||||
@@ -16,15 +16,14 @@ def test_normal_work(zenith_cli, pageserver, postgres, wa_factory):
|
||||
pg = postgres.create_start('test_wal_acceptors_normal_work',
|
||||
wal_acceptors=wa_factory.get_connstrs())
|
||||
|
||||
pg_conn = psycopg2.connect(pg.connstr())
|
||||
# do commit after each statement as waiting for acceptors happens there
|
||||
pg_conn.autocommit = True
|
||||
|
||||
cur = pg_conn.cursor()
|
||||
cur.execute('CREATE TABLE t(key int primary key, value text)')
|
||||
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
|
||||
cur.execute('SELECT sum(key) FROM t')
|
||||
assert cur.fetchone() == (5000050000, )
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# we rely upon autocommit after each statement
|
||||
# as waiting for acceptors happens there
|
||||
cur.execute('CREATE TABLE t(key int primary key, value text)')
|
||||
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
|
||||
cur.execute('SELECT sum(key) FROM t')
|
||||
assert cur.fetchone() == (5000050000, )
|
||||
|
||||
|
||||
# Run page server and multiple acceptors, and multiple compute nodes running
|
||||
@@ -72,10 +71,9 @@ def test_restarts(zenith_cli, pageserver, postgres, wa_factory):
|
||||
pg = postgres.create_start('test_wal_acceptors_restarts',
|
||||
wal_acceptors=wa_factory.get_connstrs())
|
||||
|
||||
pg_conn = psycopg2.connect(pg.connstr())
|
||||
# do commit after each statement as waiting for acceptors happens there
|
||||
pg_conn.autocommit = True
|
||||
|
||||
# we rely upon autocommit after each statement
|
||||
# as waiting for acceptors happens there
|
||||
pg_conn = pg.connect()
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
failed_node = None
|
||||
@@ -110,9 +108,9 @@ def test_unavailability(zenith_cli, pageserver, postgres, wa_factory):
|
||||
pg = postgres.create_start('test_wal_acceptors_unavailability',
|
||||
wal_acceptors=wa_factory.get_connstrs())
|
||||
|
||||
pg_conn = psycopg2.connect(pg.connstr())
|
||||
# do commit after each statement as waiting for acceptors happens there
|
||||
pg_conn.autocommit = True
|
||||
# we rely upon autocommit after each statement
|
||||
# as waiting for acceptors happens there
|
||||
pg_conn = pg.connect()
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
# check basic work with table
|
||||
@@ -181,9 +179,9 @@ def test_race_conditions(zenith_cli, pageserver, postgres, wa_factory, stop_valu
|
||||
pg = postgres.create_start('test_wal_acceptors_race_conditions',
|
||||
wal_acceptors=wa_factory.get_connstrs())
|
||||
|
||||
pg_conn = psycopg2.connect(pg.connstr())
|
||||
# do commit after each statement as waiting for acceptors happens there
|
||||
pg_conn.autocommit = True
|
||||
# we rely upon autocommit after each statement
|
||||
# as waiting for acceptors happens there
|
||||
pg_conn = pg.connect()
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
cur.execute('CREATE TABLE t(key int primary key, value text)')
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import psycopg2
|
||||
import json
|
||||
|
||||
pytest_plugins = ("fixtures.zenith_fixtures")
|
||||
@@ -24,8 +23,7 @@ def helper_compare_branch_list(page_server_cur, zenith_cli):
|
||||
|
||||
def test_cli_branch_list(pageserver, zenith_cli):
|
||||
|
||||
page_server_conn = psycopg2.connect(pageserver.connstr())
|
||||
page_server_conn.autocommit = True
|
||||
page_server_conn = pageserver.connect()
|
||||
page_server_cur = page_server_conn.cursor()
|
||||
|
||||
# Initial sanity check
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import os
|
||||
import psycopg2
|
||||
|
||||
from fixtures.utils import mkdir_if_needed
|
||||
|
||||
@@ -15,11 +14,7 @@ def test_isolation(pageserver, postgres, pg_bin, zenith_cli, test_output_dir, pg
|
||||
# Connect to postgres and create a database called "regression".
|
||||
# isolation tests use prepared transactions, so enable them
|
||||
pg = postgres.create_start('test_isolation', config_lines=['max_prepared_transactions=100'])
|
||||
pg_conn = psycopg2.connect(pg.connstr())
|
||||
pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
cur = pg_conn.cursor()
|
||||
cur.execute('CREATE DATABASE isolation_regression')
|
||||
pg_conn.close()
|
||||
pg.safe_psql('CREATE DATABASE isolation_regression')
|
||||
|
||||
# Create some local directories for pg_isolation_regress to run in.
|
||||
runpath = os.path.join(test_output_dir, 'regress')
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import os
|
||||
import psycopg2
|
||||
|
||||
from fixtures.utils import mkdir_if_needed
|
||||
|
||||
@@ -14,11 +13,7 @@ def test_pg_regress(pageserver, postgres, pg_bin, zenith_cli, test_output_dir, p
|
||||
|
||||
# Connect to postgres and create a database called "regression".
|
||||
pg = postgres.create_start('test_pg_regress')
|
||||
pg_conn = psycopg2.connect(pg.connstr())
|
||||
pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
cur = pg_conn.cursor()
|
||||
cur.execute('CREATE DATABASE regression')
|
||||
pg_conn.close()
|
||||
pg.safe_psql('CREATE DATABASE regression')
|
||||
|
||||
# Create some local directories for pg_regress to run in.
|
||||
runpath = os.path.join(test_output_dir, 'regress')
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import os
|
||||
import psycopg2
|
||||
|
||||
from fixtures.utils import mkdir_if_needed
|
||||
|
||||
@@ -14,11 +13,7 @@ def test_zenith_regress(pageserver, postgres, pg_bin, zenith_cli, test_output_di
|
||||
|
||||
# Connect to postgres and create a database called "regression".
|
||||
pg = postgres.create_start('test_zenith_regress')
|
||||
pg_conn = psycopg2.connect(pg.connstr())
|
||||
pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
cur = pg_conn.cursor()
|
||||
cur.execute('CREATE DATABASE regression')
|
||||
pg_conn.close()
|
||||
pg.safe_psql('CREATE DATABASE regression')
|
||||
|
||||
# Create some local directories for pg_regress to run in.
|
||||
runpath = os.path.join(test_output_dir, 'regress')
|
||||
|
||||
@@ -1,14 +1,17 @@
|
||||
import getpass
|
||||
import os
|
||||
import signal
|
||||
import psycopg2
|
||||
import pytest
|
||||
import shutil
|
||||
import signal
|
||||
import subprocess
|
||||
import psycopg2
|
||||
|
||||
from contextlib import closing
|
||||
from pathlib import Path
|
||||
|
||||
from typing import Any, Callable, Dict, Iterator, List, Optional, TypeVar
|
||||
# Type-related stuff
|
||||
from psycopg2.extensions import connection as PgConnection
|
||||
from typing import Any, Callable, Dict, Iterator, List, Optional, TypeVar, cast
|
||||
from typing_extensions import Literal
|
||||
|
||||
from .utils import (get_self_dir, mkdir_if_needed, subprocess_capture)
|
||||
@@ -75,6 +78,48 @@ def safety_check() -> None:
|
||||
raise Exception('found interfering processes running')
|
||||
|
||||
|
||||
class PgProtocol:
|
||||
""" Reusable connection logic """
|
||||
def __init__(self, host: str, port: int, username: Optional[str] = None):
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.username = username or getpass.getuser()
|
||||
|
||||
def connstr(self, *, dbname: str = 'postgres', username: Optional[str] = None) -> str:
|
||||
"""
|
||||
Build a libpq connection string for the Postgres instance.
|
||||
"""
|
||||
|
||||
username = username or self.username
|
||||
return f'host={self.host} port={self.port} user={username} dbname={dbname}'
|
||||
|
||||
# autocommit=True here by default because that's what we need most of the time
|
||||
def connect(self, *, autocommit=True, **kwargs: Any) -> PgConnection:
|
||||
"""
|
||||
Connect to the node.
|
||||
Returns psycopg2's connection object.
|
||||
This method passes all extra params to connstr.
|
||||
"""
|
||||
|
||||
conn = psycopg2.connect(self.connstr(**kwargs))
|
||||
# WARNING: this setting affects *all* tests!
|
||||
conn.autocommit = autocommit
|
||||
return conn
|
||||
|
||||
def safe_psql(self, query: str, **kwargs: Any) -> List[Any]:
|
||||
"""
|
||||
Execute query against the node and return all rows.
|
||||
This method passes all extra params to connstr.
|
||||
"""
|
||||
|
||||
with closing(self.connect(**kwargs)) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(query)
|
||||
if cur.description is None:
|
||||
return [] # query didn't return data
|
||||
return cast(List[Any], cur.fetchall())
|
||||
|
||||
|
||||
class ZenithCli:
|
||||
"""
|
||||
An object representing the CLI binary named "zenith".
|
||||
@@ -120,9 +165,11 @@ def zenith_cli(zenith_binpath: str, repo_dir: str, pg_distrib_dir: str) -> Zenit
|
||||
return ZenithCli(zenith_binpath, repo_dir, pg_distrib_dir)
|
||||
|
||||
|
||||
class ZenithPageserver:
|
||||
class ZenithPageserver(PgProtocol):
|
||||
""" An object representing a running pageserver. """
|
||||
def __init__(self, zenith_cli: ZenithCli):
|
||||
super().__init__(host='localhost', port=DEFAULT_PAGESERVER_PORT)
|
||||
|
||||
self.zenith_cli = zenith_cli
|
||||
self.running = False
|
||||
|
||||
@@ -157,16 +204,6 @@ class ZenithPageserver:
|
||||
|
||||
return self
|
||||
|
||||
# The page server speaks the Postgres FE/BE protocol, so you can connect
|
||||
# to it with any Postgres client, and run special commands. This function
|
||||
# returns a libpq connection string for connecting to it.
|
||||
def connstr(self) -> str:
|
||||
username = getpass.getuser()
|
||||
conn_str = 'host={} port={} dbname=postgres user={}'.format('localhost',
|
||||
DEFAULT_PAGESERVER_PORT,
|
||||
username)
|
||||
return conn_str
|
||||
|
||||
|
||||
@zenfixture
|
||||
def pageserver(zenith_cli: ZenithCli) -> Iterator[ZenithPageserver]:
|
||||
@@ -193,15 +230,14 @@ def pageserver(zenith_cli: ZenithCli) -> Iterator[ZenithPageserver]:
|
||||
ps.stop()
|
||||
|
||||
|
||||
class Postgres:
|
||||
class Postgres(PgProtocol):
|
||||
""" An object representing a running postgres daemon. """
|
||||
def __init__(self, zenith_cli: ZenithCli, repo_dir: str, instance_num: int):
|
||||
super().__init__(host='localhost', port=55431 + instance_num)
|
||||
|
||||
self.zenith_cli = zenith_cli
|
||||
self.instance_num = instance_num
|
||||
self.running = False
|
||||
self.username = getpass.getuser()
|
||||
self.host = 'localhost'
|
||||
self.port = 55431 + instance_num # TODO: find a better way
|
||||
self.repo_dir = repo_dir
|
||||
self.branch: Optional[str] = None # dubious, see asserts below
|
||||
# path to conf is <repo_dir>/pgdatadirs/<branch_name>/postgresql.conf
|
||||
@@ -242,18 +278,18 @@ class Postgres:
|
||||
|
||||
return self
|
||||
|
||||
""" Path to postgresql.conf """
|
||||
|
||||
def config_file_path(self) -> str:
|
||||
filename = 'pgdatadirs/{}/postgresql.conf'.format(self.branch)
|
||||
""" Path to postgresql.conf """
|
||||
filename = f'pgdatadirs/{self.branch}/postgresql.conf'
|
||||
return os.path.join(self.repo_dir, filename)
|
||||
|
||||
"""
|
||||
Adjust instance config for working with wal acceptors instead of
|
||||
pageserver (pre-configured by CLI) directly.
|
||||
"""
|
||||
def adjust_for_wal_acceptors(self, wal_acceptors: str) -> 'Postgres':
|
||||
"""
|
||||
Adjust instance config for working with wal acceptors instead of
|
||||
pageserver (pre-configured by CLI) directly.
|
||||
"""
|
||||
|
||||
def adjust_for_wal_acceptors(self, wal_acceptors) -> 'Postgres':
|
||||
# TODO: reuse config()
|
||||
with open(self.config_file_path(), "r") as f:
|
||||
cfg_lines = f.readlines()
|
||||
with open(self.config_file_path(), "w") as f:
|
||||
@@ -319,33 +355,11 @@ class Postgres:
|
||||
|
||||
return self
|
||||
|
||||
def connstr(self, dbname: str = 'postgres', username: Optional[str] = None) -> str:
|
||||
"""
|
||||
Build a libpq connection string for the Postgres instance.
|
||||
"""
|
||||
|
||||
conn_str = 'host={} port={} dbname={} user={}'.format(self.host, self.port, dbname,
|
||||
(username or self.username))
|
||||
|
||||
return conn_str
|
||||
|
||||
def safe_psql(self, query, dbname='postgres', username=None):
|
||||
"""
|
||||
Execute query against the node and return all (fetchall) results
|
||||
"""
|
||||
with psycopg2.connect(self.connstr(dbname, username)) as conn:
|
||||
with conn.cursor() as curs:
|
||||
curs.execute(query)
|
||||
if curs.description is None:
|
||||
return [] # query didn't return data
|
||||
return curs.fetchall()
|
||||
|
||||
|
||||
class PostgresFactory:
|
||||
""" An object representing multiple running postgres daemons. """
|
||||
def __init__(self, zenith_cli: ZenithCli, repo_dir: str):
|
||||
self.zenith_cli = zenith_cli
|
||||
self.host = 'localhost'
|
||||
self.repo_dir = repo_dir
|
||||
self.num_instances = 0
|
||||
self.instances: List[Postgres] = []
|
||||
@@ -439,17 +453,13 @@ def pg_bin(test_output_dir: str, pg_distrib_dir: str) -> PgBin:
|
||||
return PgBin(test_output_dir, pg_distrib_dir)
|
||||
|
||||
|
||||
""" Read content of file into number """
|
||||
|
||||
|
||||
def read_pid(path):
|
||||
""" Read content of file into number """
|
||||
return int(Path(path).read_text())
|
||||
|
||||
|
||||
""" An object representing a running wal acceptor daemon. """
|
||||
|
||||
|
||||
class WalAcceptor:
|
||||
""" An object representing a running wal acceptor daemon. """
|
||||
def __init__(self, wa_binpath, data_dir, port, num):
|
||||
self.wa_binpath = wa_binpath
|
||||
self.data_dir = data_dir
|
||||
@@ -504,11 +514,12 @@ class WalAcceptorFactory:
|
||||
self.data_dir = data_dir
|
||||
self.instances = []
|
||||
self.initial_port = 54321
|
||||
|
||||
def start_new(self) -> WalAcceptor:
|
||||
"""
|
||||
Start new wal acceptor.
|
||||
"""
|
||||
|
||||
def start_new(self) -> WalAcceptor:
|
||||
wa_num = len(self.instances)
|
||||
wa = WalAcceptor(self.wa_binpath,
|
||||
os.path.join(self.data_dir, "wal_acceptor_{}".format(wa_num)),
|
||||
@@ -517,11 +528,11 @@ class WalAcceptorFactory:
|
||||
self.instances.append(wa)
|
||||
return wa
|
||||
|
||||
"""
|
||||
Start n new wal acceptors
|
||||
"""
|
||||
def start_n_new(self, n: int) -> None:
|
||||
"""
|
||||
Start n new wal acceptors.
|
||||
"""
|
||||
|
||||
def start_n_new(self, n):
|
||||
for _ in range(n):
|
||||
self.start_new()
|
||||
|
||||
@@ -530,14 +541,13 @@ class WalAcceptorFactory:
|
||||
wa.stop()
|
||||
return self
|
||||
|
||||
""" Get list of wal acceptor endpoints suitable for wal_acceptors GUC """
|
||||
|
||||
def get_connstrs(self) -> str:
|
||||
""" Get list of wal acceptor endpoints suitable for wal_acceptors GUC """
|
||||
return ','.join(["127.0.0.1:{}".format(wa.port) for wa in self.instances])
|
||||
|
||||
|
||||
@zenfixture
|
||||
def wa_factory(zenith_binpath, repo_dir) -> Iterator[WalAcceptorFactory]:
|
||||
def wa_factory(zenith_binpath: str, repo_dir: str) -> Iterator[WalAcceptorFactory]:
|
||||
""" Gives WalAcceptorFactory providing wal acceptors. """
|
||||
wafactory = WalAcceptorFactory(zenith_binpath, os.path.join(repo_dir, "wal_acceptors"))
|
||||
yield wafactory
|
||||
|
||||
Reference in New Issue
Block a user