use only s3 in boto3-stubs and update mypy

Newer version of mypy fixes buggy error when trying to update only boto3 stubs.
However it brings new checks and starts to yell when we index into
cusror.fetchone without checking for None first. So this introduces a wrapper
to simplify quering for scalar values. I tried to use cursor_factory connection
argument but without success. There can be a better way to do that,
but this looks the simplest
This commit is contained in:
Dmitry Rodionov
2022-07-28 19:46:26 +03:00
committed by Dmitry Rodionov
parent e73b95a09d
commit 092a9b74d3
28 changed files with 871 additions and 1410 deletions

1729
poetry.lock generated

File diff suppressed because one or more lines are too long

View File

@@ -8,7 +8,7 @@ authors = []
python = "^3.9"
pytest = "^6.2.5"
psycopg2-binary = "^2.9.1"
typing-extensions = "^3.10.0"
typing-extensions = "^4.1.0"
PyJWT = {version = "^2.1.0", extras = ["crypto"]}
requests = "^2.26.0"
pytest-xdist = "^2.3.0"
@@ -16,10 +16,10 @@ asyncpg = "^0.24.0"
aiopg = "^1.3.1"
cached-property = "^1.5.2"
Jinja2 = "^3.0.2"
types-requests = "^2.27.7"
types-psycopg2 = "^2.9.6"
types-requests = "^2.28.5"
types-psycopg2 = "^2.9.18"
boto3 = "^1.20.40"
boto3-stubs = "^1.20.40"
boto3-stubs = {version = "^1.23.38", extras = ["s3"]}
moto = {version = "^3.0.0", extras = ["server"]}
backoff = "^1.11.1"
pytest-lazy-fixture = "^0.6.3"
@@ -29,7 +29,7 @@ pytest-timeout = "^2.1.0"
[tool.poetry.dev-dependencies]
yapf = "==0.31.0"
flake8 = "^3.9.2"
mypy = "==0.910"
mypy = "==0.971"
[build-system]
requires = ["poetry-core>=1.0.0"]

View File

@@ -1,6 +1,7 @@
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, NeonPageserverApiException
from fixtures.utils import query_scalar
#
@@ -25,13 +26,11 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
pg_branch0 = env.postgres.create_start('main', tenant_id=tenant)
branch0_cur = pg_branch0.connect().cursor()
branch0_cur.execute("SHOW neon.timeline_id")
branch0_timeline = branch0_cur.fetchone()[0]
branch0_timeline = query_scalar(branch0_cur, "SHOW neon.timeline_id")
log.info(f"b0 timeline {branch0_timeline}")
# Create table, and insert 100k rows.
branch0_cur.execute('SELECT pg_current_wal_insert_lsn()')
branch0_lsn = branch0_cur.fetchone()[0]
branch0_lsn = query_scalar(branch0_cur, 'SELECT pg_current_wal_insert_lsn()')
log.info(f"b0 at lsn {branch0_lsn}")
branch0_cur.execute('CREATE TABLE foo (t text) WITH (autovacuum_enabled = off)')
@@ -40,8 +39,7 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
SELECT '00112233445566778899AABBCCDDEEFF' || ':branch0:' || g
FROM generate_series(1, 100000) g
''')
branch0_cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn_100 = branch0_cur.fetchone()[0]
lsn_100 = query_scalar(branch0_cur, 'SELECT pg_current_wal_insert_lsn()')
log.info(f'LSN after 100k rows: {lsn_100}')
# Create branch1.
@@ -50,12 +48,10 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
log.info("postgres is running on 'branch1' branch")
branch1_cur = pg_branch1.connect().cursor()
branch1_cur.execute("SHOW neon.timeline_id")
branch1_timeline = branch1_cur.fetchone()[0]
branch1_timeline = query_scalar(branch1_cur, "SHOW neon.timeline_id")
log.info(f"b1 timeline {branch1_timeline}")
branch1_cur.execute('SELECT pg_current_wal_insert_lsn()')
branch1_lsn = branch1_cur.fetchone()[0]
branch1_lsn = query_scalar(branch1_cur, 'SELECT pg_current_wal_insert_lsn()')
log.info(f"b1 at lsn {branch1_lsn}")
# Insert 100k rows.
@@ -64,8 +60,7 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
SELECT '00112233445566778899AABBCCDDEEFF' || ':branch1:' || g
FROM generate_series(1, 100000) g
''')
branch1_cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn_200 = branch1_cur.fetchone()[0]
lsn_200 = query_scalar(branch1_cur, 'SELECT pg_current_wal_insert_lsn()')
log.info(f'LSN after 200k rows: {lsn_200}')
# Create branch2.
@@ -74,12 +69,10 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
log.info("postgres is running on 'branch2' branch")
branch2_cur = pg_branch2.connect().cursor()
branch2_cur.execute("SHOW neon.timeline_id")
branch2_timeline = branch2_cur.fetchone()[0]
branch2_timeline = query_scalar(branch2_cur, "SHOW neon.timeline_id")
log.info(f"b2 timeline {branch2_timeline}")
branch2_cur.execute('SELECT pg_current_wal_insert_lsn()')
branch2_lsn = branch2_cur.fetchone()[0]
branch2_lsn = query_scalar(branch2_cur, 'SELECT pg_current_wal_insert_lsn()')
log.info(f"b2 at lsn {branch2_lsn}")
# Insert 100k rows.
@@ -88,20 +81,16 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
SELECT '00112233445566778899AABBCCDDEEFF' || ':branch2:' || g
FROM generate_series(1, 100000) g
''')
branch2_cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn_300 = branch2_cur.fetchone()[0]
lsn_300 = query_scalar(branch2_cur, 'SELECT pg_current_wal_insert_lsn()')
log.info(f'LSN after 300k rows: {lsn_300}')
# Run compaction on branch1.
psconn = env.pageserver.connect()
log.info(f'compact {tenant.hex} {branch1_timeline} {lsn_200}')
psconn.cursor().execute(f'''compact {tenant.hex} {branch1_timeline} {lsn_200}''')
compact = f'compact {tenant.hex} {branch1_timeline} {lsn_200}'
log.info(compact)
env.pageserver.safe_psql(compact)
branch0_cur.execute('SELECT count(*) FROM foo')
assert branch0_cur.fetchone() == (100000, )
assert query_scalar(branch0_cur, 'SELECT count(*) FROM foo') == 100000
branch1_cur.execute('SELECT count(*) FROM foo')
assert branch1_cur.fetchone() == (200000, )
assert query_scalar(branch1_cur, 'SELECT count(*) FROM foo') == 200000
branch2_cur.execute('SELECT count(*) FROM foo')
assert branch2_cur.fetchone() == (300000, )
assert query_scalar(branch2_cur, 'SELECT count(*) FROM foo') == 300000

View File

@@ -3,7 +3,7 @@ import pytest
import time
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.utils import lsn_from_hex
from fixtures.utils import lsn_from_hex, query_scalar
# Test the GC implementation when running with branching.
@@ -76,20 +76,17 @@ def test_branch_and_gc(neon_simple_env: NeonEnv):
"CREATE TABLE foo(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')"
)
main_cur.execute('INSERT INTO foo SELECT FROM generate_series(1, 100000)')
main_cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn1 = main_cur.fetchone()[0]
lsn1 = query_scalar(main_cur, 'SELECT pg_current_wal_insert_lsn()')
log.info(f'LSN1: {lsn1}')
main_cur.execute('INSERT INTO foo SELECT FROM generate_series(1, 100000)')
main_cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn2 = main_cur.fetchone()[0]
lsn2 = query_scalar(main_cur, 'SELECT pg_current_wal_insert_lsn()')
log.info(f'LSN2: {lsn2}')
# Set the GC horizon so that lsn1 is inside the horizon, which means
# we can create a new branch starting from lsn1.
env.pageserver.safe_psql(
f'''do_gc {tenant.hex} {timeline_main.hex} {lsn_from_hex(lsn2) - lsn_from_hex(lsn1) + 1024}'''
)
f'do_gc {tenant.hex} {timeline_main.hex} {lsn_from_hex(lsn2) - lsn_from_hex(lsn1) + 1024}')
env.neon_cli.create_branch('test_branch',
'test_main',
@@ -100,8 +97,7 @@ def test_branch_and_gc(neon_simple_env: NeonEnv):
branch_cur = pg_branch.connect().cursor()
branch_cur.execute('INSERT INTO foo SELECT FROM generate_series(1, 100000)')
branch_cur.execute('SELECT count(*) FROM foo')
assert branch_cur.fetchone() == (200000, )
assert query_scalar(branch_cur, 'SELECT count(*) FROM foo') == 200000
# This test simulates a race condition happening when branch creation and GC are performed concurrently.

View File

@@ -1,9 +1,7 @@
from contextlib import closing
import psycopg2.extras
import pytest
from fixtures.log_helper import log
from fixtures.utils import print_gc_result
from fixtures.utils import print_gc_result, query_scalar
from fixtures.neon_fixtures import NeonEnvBuilder
@@ -27,26 +25,22 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder):
pgmain = env.postgres.create_start('test_branch_behind')
log.info("postgres is running on 'test_branch_behind' branch")
main_pg_conn = pgmain.connect()
main_cur = main_pg_conn.cursor()
main_cur = pgmain.connect().cursor()
main_cur.execute("SHOW neon.timeline_id")
timeline = main_cur.fetchone()[0]
timeline = query_scalar(main_cur, "SHOW neon.timeline_id")
# Create table, and insert the first 100 rows
main_cur.execute('CREATE TABLE foo (t text)')
# keep some early lsn to test branch creation on out of date lsn
main_cur.execute('SELECT pg_current_wal_insert_lsn()')
gced_lsn = main_cur.fetchone()[0]
gced_lsn = query_scalar(main_cur, 'SELECT pg_current_wal_insert_lsn()')
main_cur.execute('''
INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 100) g
''')
main_cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn_a = main_cur.fetchone()[0]
lsn_a = query_scalar(main_cur, 'SELECT pg_current_wal_insert_lsn()')
log.info(f'LSN after 100 rows: {lsn_a}')
# Insert some more rows. (This generates enough WAL to fill a few segments.)
@@ -55,8 +49,7 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder):
SELECT 'long string to consume some space' || g
FROM generate_series(1, 200000) g
''')
main_cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn_b = main_cur.fetchone()[0]
lsn_b = query_scalar(main_cur, 'SELECT pg_current_wal_insert_lsn()')
log.info(f'LSN after 200100 rows: {lsn_b}')
# Branch at the point where only 100 rows were inserted
@@ -70,10 +63,8 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder):
SELECT 'long string to consume some space' || g
FROM generate_series(1, 200000) g
''')
main_cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn_c = query_scalar(main_cur, 'SELECT pg_current_wal_insert_lsn()')
main_cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn_c = main_cur.fetchone()[0]
log.info(f'LSN after 400100 rows: {lsn_c}')
# Branch at the point where only 200100 rows were inserted
@@ -85,20 +76,15 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder):
pg_more = env.postgres.create_start('test_branch_behind_more')
# On the 'hundred' branch, we should see only 100 rows
hundred_pg_conn = pg_hundred.connect()
hundred_cur = hundred_pg_conn.cursor()
hundred_cur.execute('SELECT count(*) FROM foo')
assert hundred_cur.fetchone() == (100, )
hundred_cur = pg_hundred.connect().cursor()
assert query_scalar(hundred_cur, 'SELECT count(*) FROM foo') == 100
# On the 'more' branch, we should see 100200 rows
more_pg_conn = pg_more.connect()
more_cur = more_pg_conn.cursor()
more_cur.execute('SELECT count(*) FROM foo')
assert more_cur.fetchone() == (200100, )
more_cur = pg_more.connect().cursor()
assert query_scalar(more_cur, 'SELECT count(*) FROM foo') == 200100
# All the rows are visible on the main branch
main_cur.execute('SELECT count(*) FROM foo')
assert main_cur.fetchone() == (400100, )
assert query_scalar(main_cur, 'SELECT count(*) FROM foo') == 400100
# Check bad lsn's for branching
@@ -107,9 +93,7 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder):
'test_branch_behind',
ancestor_start_lsn="0/3000000")
pg = env.postgres.create_start('test_branch_segment_boundary')
cur = pg.connect().cursor()
cur.execute('SELECT 1')
assert cur.fetchone() == (1, )
assert pg.safe_psql('SELECT 1')[0][0] == 1
# branch at pre-initdb lsn
with pytest.raises(Exception, match="invalid branch start lsn"):
@@ -122,12 +106,11 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder):
ancestor_start_lsn="0/42")
# check that we cannot create branch based on garbage collected data
with closing(env.pageserver.connect()) as psconn:
with psconn.cursor(cursor_factory=psycopg2.extras.DictCursor) as pscur:
# call gc to advace latest_gc_cutoff_lsn
pscur.execute(f"do_gc {env.initial_tenant.hex} {timeline} 0")
row = pscur.fetchone()
print_gc_result(row)
with env.pageserver.cursor(cursor_factory=psycopg2.extras.DictCursor) as pscur:
# call gc to advace latest_gc_cutoff_lsn
pscur.execute(f"do_gc {env.initial_tenant.hex} {timeline} 0")
row = pscur.fetchone()
print_gc_result(row)
with pytest.raises(Exception, match="invalid branch start lsn"):
# this gced_lsn is pretty random, so if gc is disabled this woudln't fail
@@ -136,11 +119,8 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder):
ancestor_start_lsn=gced_lsn)
# check that after gc everything is still there
hundred_cur.execute('SELECT count(*) FROM foo')
assert hundred_cur.fetchone() == (100, )
assert query_scalar(hundred_cur, 'SELECT count(*) FROM foo') == 100
more_cur.execute('SELECT count(*) FROM foo')
assert more_cur.fetchone() == (200100, )
assert query_scalar(more_cur, 'SELECT count(*) FROM foo') == 200100
main_cur.execute('SELECT count(*) FROM foo')
assert main_cur.fetchone() == (400100, )
assert query_scalar(main_cur, 'SELECT count(*) FROM foo') == 400100

View File

@@ -1,10 +1,14 @@
from typing import List, Tuple
from uuid import UUID
import pytest
import concurrent.futures
from contextlib import closing
from fixtures.neon_fixtures import NeonEnvBuilder, NeonEnv
from fixtures.neon_fixtures import NeonEnvBuilder, NeonEnv, Postgres
from fixtures.log_helper import log
import os
from fixtures.utils import query_scalar
# Test restarting page server, while safekeeper and compute node keep
# running.
@@ -13,7 +17,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
tenant_timelines = []
tenant_timelines: List[Tuple[str, str, Postgres]] = []
for n in range(4):
tenant_id_uuid, timeline_id_uuid = env.neon_cli.create_tenant()
@@ -21,13 +25,11 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
timeline_id = timeline_id_uuid.hex
pg = env.postgres.create_start(f'main', tenant_id=tenant_id_uuid)
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute("CREATE TABLE t(key int primary key, value text)")
cur.execute("INSERT INTO t SELECT generate_series(1,100), 'payload'")
with pg.cursor() as cur:
cur.execute("CREATE TABLE t(key int primary key, value text)")
cur.execute("INSERT INTO t SELECT generate_series(1,100), 'payload'")
cur.execute("SHOW neon.timeline_id")
timeline_id = cur.fetchone()[0]
timeline_id = query_scalar(cur, "SHOW neon.timeline_id")
pg.stop()
tenant_timelines.append((tenant_id, timeline_id, pg))
@@ -68,10 +70,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
# Tenant 0 should still work
pg0.start()
with closing(pg0.connect()) as conn:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM t")
assert cur.fetchone()[0] == 100
assert pg0.safe_psql("SELECT COUNT(*) FROM t")[0][0] == 100
# But all others are broken
for n in range(1, 4):

View File

@@ -5,6 +5,7 @@ from contextlib import closing
from fixtures.neon_fixtures import NeonEnv
from fixtures.log_helper import log
from fixtures.utils import query_scalar
#
@@ -32,17 +33,16 @@ def test_clog_truncate(neon_simple_env: NeonEnv):
pg.safe_psql('CREATE EXTENSION neon_test_utils')
# Consume many xids to advance clog
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute('select test_consume_xids(1000*1000*10);')
log.info('xids consumed')
with pg.cursor() as cur:
cur.execute('select test_consume_xids(1000*1000*10);')
log.info('xids consumed')
# call a checkpoint to trigger TruncateSubtrans
cur.execute('CHECKPOINT;')
# call a checkpoint to trigger TruncateSubtrans
cur.execute('CHECKPOINT;')
# ensure WAL flush
cur.execute('select txid_current()')
log.info(cur.fetchone())
# ensure WAL flush
cur.execute('select txid_current()')
log.info(cur.fetchone())
# wait for autovacuum to truncate the pg_xact
# XXX Is it worth to add a timeout here?
@@ -54,11 +54,9 @@ def test_clog_truncate(neon_simple_env: NeonEnv):
time.sleep(5)
# checkpoint to advance latest lsn
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute('CHECKPOINT;')
cur.execute('select pg_current_wal_insert_lsn()')
lsn_after_truncation = cur.fetchone()[0]
with pg.cursor() as cur:
cur.execute('CHECKPOINT;')
lsn_after_truncation = query_scalar(cur, 'select pg_current_wal_insert_lsn()')
# create new branch after clog truncation and start a compute node on it
log.info(f'create branch at lsn_after_truncation {lsn_after_truncation}')

View File

@@ -4,6 +4,7 @@ import pathlib
from contextlib import closing
from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
from fixtures.log_helper import log
from fixtures.utils import query_scalar
#
@@ -16,15 +17,13 @@ def test_createdb(neon_simple_env: NeonEnv):
pg = env.postgres.create_start('test_createdb')
log.info("postgres is running on 'test_createdb' branch")
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
# Cause a 'relmapper' change in the original branch
cur.execute('VACUUM FULL pg_class')
with pg.cursor() as cur:
# Cause a 'relmapper' change in the original branch
cur.execute('VACUUM FULL pg_class')
cur.execute('CREATE DATABASE foodb')
cur.execute('CREATE DATABASE foodb')
cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn = cur.fetchone()[0]
lsn = query_scalar(cur, 'SELECT pg_current_wal_insert_lsn()')
# Create a branch
env.neon_cli.create_branch('test_createdb2', 'test_createdb', ancestor_start_lsn=lsn)
@@ -32,21 +31,21 @@ def test_createdb(neon_simple_env: NeonEnv):
# Test that you can connect to the new database on both branches
for db in (pg, pg2):
with closing(db.connect(dbname='foodb')) as conn:
with conn.cursor() as cur:
# Check database size in both branches
cur.execute("""
select pg_size_pretty(pg_database_size('foodb')),
pg_size_pretty(
sum(pg_relation_size(oid, 'main'))
+sum(pg_relation_size(oid, 'vm'))
+sum(pg_relation_size(oid, 'fsm'))
) FROM pg_class where relisshared is false
""")
res = cur.fetchone()
# check that dbsize equals sum of all relation sizes, excluding shared ones
# This is how we define dbsize in neon for now
assert res[0] == res[1]
with db.cursor(dbname='foodb') as cur:
# Check database size in both branches
cur.execute("""
select pg_size_pretty(pg_database_size('foodb')),
pg_size_pretty(
sum(pg_relation_size(oid, 'main'))
+sum(pg_relation_size(oid, 'vm'))
+sum(pg_relation_size(oid, 'fsm'))
) FROM pg_class where relisshared is false
""")
res = cur.fetchone()
assert res is not None
# check that dbsize equals sum of all relation sizes, excluding shared ones
# This is how we define dbsize in neon for now
assert res[0] == res[1]
#
@@ -58,24 +57,19 @@ def test_dropdb(neon_simple_env: NeonEnv, test_output_dir):
pg = env.postgres.create_start('test_dropdb')
log.info("postgres is running on 'test_dropdb' branch")
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute('CREATE DATABASE foodb')
with pg.cursor() as cur:
cur.execute('CREATE DATABASE foodb')
cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn_before_drop = cur.fetchone()[0]
lsn_before_drop = query_scalar(cur, 'SELECT pg_current_wal_insert_lsn()')
cur.execute("SELECT oid FROM pg_database WHERE datname='foodb';")
dboid = cur.fetchone()[0]
dboid = query_scalar(cur, "SELECT oid FROM pg_database WHERE datname='foodb';")
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute('DROP DATABASE foodb')
with pg.cursor() as cur:
cur.execute('DROP DATABASE foodb')
cur.execute('CHECKPOINT')
cur.execute('CHECKPOINT')
cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn_after_drop = cur.fetchone()[0]
lsn_after_drop = query_scalar(cur, 'SELECT pg_current_wal_insert_lsn()')
# Create two branches before and after database drop.
env.neon_cli.create_branch('test_before_dropdb',

View File

@@ -1,7 +1,6 @@
from contextlib import closing
from fixtures.neon_fixtures import NeonEnv
from fixtures.log_helper import log
from fixtures.utils import query_scalar
#
@@ -13,15 +12,13 @@ def test_createuser(neon_simple_env: NeonEnv):
pg = env.postgres.create_start('test_createuser')
log.info("postgres is running on 'test_createuser' branch")
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
# Cause a 'relmapper' change in the original branch
cur.execute('CREATE USER testuser with password %s', ('testpwd', ))
with pg.cursor() as cur:
# Cause a 'relmapper' change in the original branch
cur.execute('CREATE USER testuser with password %s', ('testpwd', ))
cur.execute('CHECKPOINT')
cur.execute('CHECKPOINT')
cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn = cur.fetchone()[0]
lsn = query_scalar(cur, 'SELECT pg_current_wal_insert_lsn()')
# Create a branch
env.neon_cli.create_branch('test_createuser2', 'test_createuser', ancestor_start_lsn=lsn)

View File

@@ -1,10 +1,8 @@
from contextlib import closing
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, PortDistributor, VanillaPostgres
from fixtures.neon_fixtures import pg_distrib_dir
import os
from fixtures.utils import subprocess_capture
from fixtures.utils import query_scalar, subprocess_capture
num_rows = 1000
@@ -21,19 +19,17 @@ def test_fullbackup(neon_env_builder: NeonEnvBuilder,
pgmain = env.postgres.create_start('test_fullbackup')
log.info("postgres is running on 'test_fullbackup' branch")
timeline = pgmain.safe_psql("SHOW neon.timeline_id")[0][0]
with pgmain.cursor() as cur:
timeline = query_scalar(cur, "SHOW neon.timeline_id")
with closing(pgmain.connect()) as conn:
with conn.cursor() as cur:
# data loading may take a while, so increase statement timeout
cur.execute("SET statement_timeout='300s'")
cur.execute(f'''CREATE TABLE tbl AS SELECT 'long string to consume some space' || g
from generate_series(1,{num_rows}) g''')
cur.execute("CHECKPOINT")
# data loading may take a while, so increase statement timeout
cur.execute("SET statement_timeout='300s'")
cur.execute(f'''CREATE TABLE tbl AS SELECT 'long string to consume some space' || g
from generate_series(1,{num_rows}) g''')
cur.execute("CHECKPOINT")
cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn = cur.fetchone()[0]
log.info(f"start_backup_lsn = {lsn}")
lsn = query_scalar(cur, 'SELECT pg_current_wal_insert_lsn()')
log.info(f"start_backup_lsn = {lsn}")
# Set LD_LIBRARY_PATH in the env properly, otherwise we may use the wrong libpq.
# PgBin sets it automatically, but here we need to pipe psql output to the tar command.

View File

@@ -3,6 +3,7 @@ import random
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, Postgres
from fixtures.log_helper import log
from fixtures.utils import query_scalar
# Test configuration
#
@@ -59,22 +60,21 @@ def test_gc_aggressive(neon_env_builder: NeonEnvBuilder):
pg = env.postgres.create_start('test_gc_aggressive')
log.info('postgres is running on test_gc_aggressive branch')
conn = pg.connect()
cur = conn.cursor()
with pg.cursor() as cur:
timeline = query_scalar(cur, "SHOW neon.timeline_id")
cur.execute("SHOW neon.timeline_id")
timeline = cur.fetchone()[0]
# Create table, and insert the first 100 rows
cur.execute('CREATE TABLE foo (id int, counter int, t text)')
cur.execute(f'''
INSERT INTO foo
SELECT g, 0, 'long string to consume some space' || g
FROM generate_series(1, {num_rows}) g
''')
cur.execute('CREATE INDEX ON foo(id)')
# Create table, and insert the first 100 rows
cur.execute('CREATE TABLE foo (id int, counter int, t text)')
cur.execute(f'''
INSERT INTO foo
SELECT g, 0, 'long string to consume some space' || g
FROM generate_series(1, {num_rows}) g
''')
cur.execute('CREATE INDEX ON foo(id)')
asyncio.run(update_and_gc(env, pg, timeline))
asyncio.run(update_and_gc(env, pg, timeline))
cur.execute('SELECT COUNT(*), SUM(counter) FROM foo')
assert cur.fetchone() == (num_rows, updates_to_perform)
cur.execute('SELECT COUNT(*), SUM(counter) FROM foo')
r = cur.fetchone()
assert r is not None
assert r == (num_rows, updates_to_perform)

View File

@@ -8,6 +8,8 @@ from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, Postgres
from fixtures.log_helper import log
import time
from fixtures.utils import query_scalar
#
# Test pageserver get_lsn_by_timestamp API
@@ -20,11 +22,8 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
pgmain = env.postgres.create_start("test_lsn_mapping")
log.info("postgres is running on 'test_lsn_mapping' branch")
ps_conn = env.pageserver.connect()
ps_cur = ps_conn.cursor()
conn = pgmain.connect()
cur = conn.cursor()
ps_cur = env.pageserver.connect().cursor()
cur = pgmain.connect().cursor()
# Create table, and insert rows, each in a separate transaction
# Disable synchronous_commit to make this initialization go faster.
#
@@ -35,9 +34,8 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
tbl = []
for i in range(1000):
cur.execute(f"INSERT INTO foo VALUES({i})")
cur.execute(f'SELECT clock_timestamp()')
# Get the timestamp at UTC
after_timestamp = cur.fetchone()[0].replace(tzinfo=None)
after_timestamp = query_scalar(cur, 'SELECT clock_timestamp()').replace(tzinfo=None)
tbl.append([i, after_timestamp])
# Execute one more transaction with synchronous_commit enabled, to flush
@@ -47,18 +45,18 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
# Check edge cases: timestamp in the future
probe_timestamp = tbl[-1][1] + timedelta(hours=1)
ps_cur.execute(
result = query_scalar(
ps_cur,
f"get_lsn_by_timestamp {env.initial_tenant.hex} {new_timeline_id.hex} '{probe_timestamp.isoformat()}Z'"
)
result = ps_cur.fetchone()[0]
assert result == 'future'
# timestamp too the far history
probe_timestamp = tbl[0][1] - timedelta(hours=10)
ps_cur.execute(
result = query_scalar(
ps_cur,
f"get_lsn_by_timestamp {env.initial_tenant.hex} {new_timeline_id.hex} '{probe_timestamp.isoformat()}Z'"
)
result = ps_cur.fetchone()[0]
assert result == 'past'
# Probe a bunch of timestamps in the valid range
@@ -66,19 +64,16 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
probe_timestamp = tbl[i][1]
# Call get_lsn_by_timestamp to get the LSN
ps_cur.execute(
lsn = query_scalar(
ps_cur,
f"get_lsn_by_timestamp {env.initial_tenant.hex} {new_timeline_id.hex} '{probe_timestamp.isoformat()}Z'"
)
lsn = ps_cur.fetchone()[0]
# Launch a new read-only node at that LSN, and check that only the rows
# that were supposed to be committed at that point in time are visible.
pg_here = env.postgres.create_start(branch_name='test_lsn_mapping',
node_name='test_lsn_mapping_read',
lsn=lsn)
with closing(pg_here.connect()) as conn_here:
with conn_here.cursor() as cur_here:
cur_here.execute("SELECT max(x) FROM foo")
assert cur_here.fetchone()[0] == i
assert pg_here.safe_psql("SELECT max(x) FROM foo")[0][0] == i
pg_here.stop_and_destroy()

View File

@@ -1,5 +1,6 @@
from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
from fixtures.log_helper import log
from fixtures.utils import query_scalar
#
@@ -14,16 +15,14 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
pg = env.postgres.create_start('test_multixact')
log.info("postgres is running on 'test_multixact' branch")
pg_conn = pg.connect()
cur = pg_conn.cursor()
cur = pg.connect().cursor()
cur.execute('''
CREATE TABLE t1(i int primary key);
INSERT INTO t1 select * from generate_series(1, 100);
''')
cur.execute('SELECT next_multixact_id FROM pg_control_checkpoint()')
next_multixact_id_old = cur.fetchone()[0]
next_multixact_id_old = query_scalar(cur,
'SELECT next_multixact_id FROM pg_control_checkpoint()')
# Lock entries using parallel connections in a round-robin fashion.
nclients = 20
@@ -53,6 +52,7 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
cur.execute(
'SELECT next_multixact_id, pg_current_wal_insert_lsn() FROM pg_control_checkpoint()')
res = cur.fetchone()
assert res is not None
next_multixact_id = res[0]
lsn = res[1]
@@ -64,11 +64,8 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
pg_new = env.postgres.create_start('test_multixact_new')
log.info("postgres is running on 'test_multixact_new' branch")
pg_new_conn = pg_new.connect()
cur_new = pg_new_conn.cursor()
cur_new.execute('SELECT next_multixact_id FROM pg_control_checkpoint()')
next_multixact_id_new = cur_new.fetchone()[0]
next_multixact_id_new = pg_new.safe_psql(
'SELECT next_multixact_id FROM pg_control_checkpoint()')[0][0]
# Check that we restored pg_controlfile correctly
assert next_multixact_id_new == next_multixact_id

View File

@@ -1,6 +1,6 @@
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.log_helper import log
from fixtures.utils import print_gc_result
from fixtures.utils import print_gc_result, query_scalar
import psycopg2.extras
@@ -26,8 +26,7 @@ def test_old_request_lsn(neon_env_builder: NeonEnvBuilder):
cur = pg_conn.cursor()
# Get the timeline ID of our branch. We need it for the 'do_gc' command
cur.execute("SHOW neon.timeline_id")
timeline = cur.fetchone()[0]
timeline = query_scalar(cur, "SHOW neon.timeline_id")
psconn = env.pageserver.connect()
pscur = psconn.cursor(cursor_factory=psycopg2.extras.DictCursor)
@@ -48,6 +47,7 @@ def test_old_request_lsn(neon_env_builder: NeonEnvBuilder):
from pg_settings where name = 'shared_buffers'
''')
row = cur.fetchone()
assert row is not None
log.info(f'shared_buffers is {row[0]}, table size {row[1]}')
assert int(row[0]) < int(row[1])

View File

@@ -30,6 +30,7 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder):
from pg_settings where name = 'shared_buffers'
''')
row = cur.fetchone()
assert row is not None
log.info(f"shared_buffers is {row[0]}, table size {row[1]}")
assert int(row[0]) < int(row[1])

View File

@@ -1,10 +1,8 @@
import subprocess
from contextlib import closing
import psycopg2.extras
import pytest
from fixtures.log_helper import log
from fixtures.utils import print_gc_result
from fixtures.utils import print_gc_result, query_scalar
from fixtures.neon_fixtures import NeonEnvBuilder
@@ -24,9 +22,7 @@ def test_pitr_gc(neon_env_builder: NeonEnvBuilder):
main_pg_conn = pgmain.connect()
main_cur = main_pg_conn.cursor()
main_cur.execute("SHOW neon.timeline_id")
timeline = main_cur.fetchone()[0]
timeline = query_scalar(main_cur, "SHOW neon.timeline_id")
# Create table
main_cur.execute('CREATE TABLE foo (t text)')
@@ -41,12 +37,15 @@ def test_pitr_gc(neon_env_builder: NeonEnvBuilder):
# keep some early lsn to test branch creation after GC
main_cur.execute('SELECT pg_current_wal_insert_lsn(), txid_current()')
res = main_cur.fetchone()
assert res is not None
lsn_a = res[0]
xid_a = res[1]
log.info(f'LSN after 100 rows: {lsn_a} xid {xid_a}')
main_cur.execute('SELECT pg_current_wal_insert_lsn(), txid_current()')
res = main_cur.fetchone()
assert res is not None
debug_lsn = res[0]
debug_xid = res[1]
log.info(f'LSN after 10000 rows: {debug_lsn} xid {debug_xid}')

View File

@@ -6,6 +6,8 @@ from fixtures.log_helper import log
from psycopg2.errors import UndefinedTable
from psycopg2.errors import IoError
from fixtures.utils import query_scalar
pytest_plugins = ("fixtures.neon_fixtures")
extensions = ["pageinspect", "neon_test_utils", "pg_buffercache"]
@@ -32,9 +34,9 @@ def test_read_validation(neon_simple_env: NeonEnv):
c.execute("select lsn, lower, upper from page_header(get_raw_page('foo', 'main', 0));")
first = c.fetchone()
assert first is not None
c.execute("select relfilenode from pg_class where relname = 'foo'")
relfilenode = c.fetchone()[0]
relfilenode = query_scalar(c, "select relfilenode from pg_class where relname = 'foo'")
c.execute("insert into foo values (2);")
c.execute("select lsn, lower, upper from page_header(get_raw_page('foo', 'main', 0));")
@@ -44,22 +46,25 @@ def test_read_validation(neon_simple_env: NeonEnv):
log.info("Test table is populated, validating buffer cache")
c.execute(
cache_entries = query_scalar(
c,
"select count(*) from pg_buffercache where relfilenode = {}".format(relfilenode))
assert c.fetchone()[0] > 0, "No buffers cached for the test relation"
assert cache_entries > 0, "No buffers cached for the test relation"
c.execute(
"select reltablespace, reldatabase, relfilenode from pg_buffercache where relfilenode = {}"
.format(relfilenode))
reln = c.fetchone()
assert reln is not None
log.info("Clear buffer cache to ensure no stale pages are brought into the cache")
c.execute("select clear_buffer_cache()")
c.execute(
cache_entries = query_scalar(
c,
"select count(*) from pg_buffercache where relfilenode = {}".format(relfilenode))
assert c.fetchone()[0] == 0, "Failed to clear buffer cache"
assert cache_entries == 0, "Failed to clear buffer cache"
log.info("Cache is clear, reading stale page version")
@@ -69,9 +74,10 @@ def test_read_validation(neon_simple_env: NeonEnv):
direct_first = c.fetchone()
assert first == direct_first, "Failed fetch page at historic lsn"
c.execute(
cache_entries = query_scalar(
c,
"select count(*) from pg_buffercache where relfilenode = {}".format(relfilenode))
assert c.fetchone()[0] == 0, "relation buffers detected after invalidation"
assert cache_entries == 0, "relation buffers detected after invalidation"
log.info("Cache is clear, reading latest page version without cache")
@@ -81,9 +87,10 @@ def test_read_validation(neon_simple_env: NeonEnv):
direct_latest = c.fetchone()
assert second == direct_latest, "Failed fetch page at latest lsn"
c.execute(
cache_entries = query_scalar(
c,
"select count(*) from pg_buffercache where relfilenode = {}".format(relfilenode))
assert c.fetchone()[0] == 0, "relation buffers detected after invalidation"
assert cache_entries == 0, "relation buffers detected after invalidation"
log.info(
"Cache is clear, reading stale page version without cache using relation identifiers"

View File

@@ -1,6 +1,7 @@
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.utils import query_scalar
#
@@ -27,7 +28,7 @@ def test_readonly_node(neon_simple_env: NeonEnv):
FROM generate_series(1, 100) g
''')
main_cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn_a = main_cur.fetchone()[0]
lsn_a = query_scalar(main_cur, 'SELECT pg_current_wal_insert_lsn()')
log.info('LSN after 100 rows: ' + lsn_a)
# Insert some more rows. (This generates enough WAL to fill a few segments.)
@@ -36,8 +37,7 @@ def test_readonly_node(neon_simple_env: NeonEnv):
SELECT 'long string to consume some space' || g
FROM generate_series(1, 200000) g
''')
main_cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn_b = main_cur.fetchone()[0]
lsn_b = query_scalar(main_cur, 'SELECT pg_current_wal_insert_lsn()')
log.info('LSN after 200100 rows: ' + lsn_b)
# Insert many more rows. This generates enough WAL to fill a few segments.
@@ -47,8 +47,7 @@ def test_readonly_node(neon_simple_env: NeonEnv):
FROM generate_series(1, 200000) g
''')
main_cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn_c = main_cur.fetchone()[0]
lsn_c = query_scalar(main_cur, 'SELECT pg_current_wal_insert_lsn()')
log.info('LSN after 400100 rows: ' + lsn_c)
# Create first read-only node at the point where only 100 rows were inserted

View File

@@ -8,7 +8,7 @@ import time
from uuid import UUID
from fixtures.neon_fixtures import NeonEnvBuilder, assert_timeline_local, wait_until, wait_for_last_record_lsn, wait_for_upload
from fixtures.log_helper import log
from fixtures.utils import lsn_from_hex, lsn_to_hex
from fixtures.utils import lsn_from_hex, query_scalar
import pytest
@@ -57,14 +57,12 @@ def test_remote_storage_backup_and_restore(neon_env_builder: NeonEnvBuilder, sto
checkpoint_numbers = range(1, 3)
for checkpoint_number in checkpoint_numbers:
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute(f'''
CREATE TABLE t{checkpoint_number}(id int primary key, secret text);
INSERT INTO t{checkpoint_number} VALUES ({data_id}, '{data_secret}|{checkpoint_number}');
''')
cur.execute("SELECT pg_current_wal_flush_lsn()")
current_lsn = lsn_from_hex(cur.fetchone()[0])
with pg.cursor() as cur:
cur.execute(f'''
CREATE TABLE t{checkpoint_number}(id int primary key, secret text);
INSERT INTO t{checkpoint_number} VALUES ({data_id}, '{data_secret}|{checkpoint_number}');
''')
current_lsn = lsn_from_hex(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
# wait until pageserver receives that data
wait_for_last_record_lsn(client, UUID(tenant_id), UUID(timeline_id), current_lsn)
@@ -123,8 +121,8 @@ def test_remote_storage_backup_and_restore(neon_env_builder: NeonEnvBuilder, sto
assert not detail['remote']['awaits_download']
pg = env.postgres.create_start('main')
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
for checkpoint_number in checkpoint_numbers:
cur.execute(f'SELECT secret FROM t{checkpoint_number} WHERE id = {data_id};')
assert cur.fetchone() == (f'{data_secret}|{checkpoint_number}', )
with pg.cursor() as cur:
for checkpoint_number in checkpoint_numbers:
assert query_scalar(cur,
f'SELECT secret FROM t{checkpoint_number} WHERE id = {data_id};'
) == f'{data_secret}|{checkpoint_number}'

View File

@@ -8,6 +8,7 @@
import asyncio
from contextlib import closing
from typing import List, Tuple
from uuid import UUID
import pytest
@@ -59,7 +60,7 @@ def test_tenants_many(neon_env_builder: NeonEnvBuilder, storage_type: str):
env = neon_env_builder.init_start()
tenants_pgs = []
tenants_pgs: List[Tuple[UUID, Postgres]] = []
for i in range(1, 5):
# Use a tiny checkpoint distance, to create a lot of layers quickly
@@ -80,14 +81,11 @@ def test_tenants_many(neon_env_builder: NeonEnvBuilder, storage_type: str):
# Wait for the remote storage uploads to finish
pageserver_http = env.pageserver.http_client()
for tenant, pg in tenants_pgs:
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute("show neon.tenant_id")
tenant_id = cur.fetchone()[0]
cur.execute("show neon.timeline_id")
timeline_id = cur.fetchone()[0]
cur.execute("SELECT pg_current_wal_flush_lsn()")
current_lsn = lsn_from_hex(cur.fetchone()[0])
res = pg.safe_psql_many(
["SHOW neon.tenant_id", "SHOW neon.timeline_id", "SELECT pg_current_wal_flush_lsn()"])
tenant_id = res[0][0][0]
timeline_id = res[1][0][0]
current_lsn = lsn_from_hex(res[2][0][0])
# wait until pageserver receives all the data
wait_for_last_record_lsn(pageserver_http, UUID(tenant_id), UUID(timeline_id), current_lsn)

View File

@@ -102,17 +102,14 @@ def wait_for_pageserver_catchup(pgmain: Postgres, polling_interval=1, timeout=60
raise RuntimeError(
f"timed out waiting for pageserver to reach pg_current_wal_flush_lsn()")
with closing(pgmain.connect()) as conn:
with conn.cursor() as cur:
cur.execute('''
select pg_size_pretty(pg_cluster_size()),
pg_wal_lsn_diff(pg_current_wal_flush_lsn(),received_lsn) as received_lsn_lag
FROM backpressure_lsns();
''')
res = cur.fetchone()
log.info(f"pg_cluster_size = {res[0]}, received_lsn_lag = {res[1]}")
received_lsn_lag = res[1]
res = pgmain.safe_psql('''
SELECT
pg_size_pretty(pg_cluster_size()),
pg_wal_lsn_diff(pg_current_wal_flush_lsn(), received_lsn) as received_lsn_lag
FROM backpressure_lsns();
''')[0]
log.info(f"pg_cluster_size = {res[0]}, received_lsn_lag = {res[1]}")
received_lsn_lag = res[1]
time.sleep(polling_interval)

View File

@@ -15,7 +15,7 @@ from dataclasses import dataclass, field
from multiprocessing import Process, Value
from pathlib import Path
from fixtures.neon_fixtures import NeonPageserver, PgBin, Etcd, Postgres, RemoteStorageUsers, Safekeeper, NeonEnv, NeonEnvBuilder, PortDistributor, SafekeeperPort, neon_binpath, PgProtocol, wait_for_last_record_lsn, wait_for_upload
from fixtures.utils import get_dir_size, lsn_to_hex, lsn_from_hex
from fixtures.utils import get_dir_size, lsn_to_hex, lsn_from_hex, query_scalar
from fixtures.log_helper import log
from typing import List, Optional, Any
from uuid import uuid4
@@ -229,8 +229,7 @@ def test_restarts(neon_env_builder: NeonEnvBuilder):
else:
failed_node.start()
failed_node = None
cur.execute('SELECT sum(key) FROM t')
assert cur.fetchone() == (500500, )
assert query_scalar(cur, 'SELECT sum(key) FROM t') == 500500
# Test that safekeepers push their info to the broker and learn peer status from it
@@ -286,12 +285,10 @@ def test_wal_removal(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
env.neon_cli.create_branch('test_safekeepers_wal_removal')
pg = env.postgres.create_start('test_safekeepers_wal_removal')
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
# we rely upon autocommit after each statement
# as waiting for acceptors happens there
cur.execute('CREATE TABLE t(key int primary key, value text)')
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
pg.safe_psql_many([
'CREATE TABLE t(key int primary key, value text)',
"INSERT INTO t SELECT generate_series(1,100000), 'payload'",
])
tenant_id = pg.safe_psql("show neon.tenant_id")[0][0]
timeline_id = pg.safe_psql("show neon.timeline_id")[0][0]
@@ -469,8 +466,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, storage_type: str):
cur.execute("insert into t select generate_series(1,500000), 'payload'")
expected_sum += 500000 * 500001 // 2
cur.execute("select sum(key) from t")
assert cur.fetchone()[0] == expected_sum
assert query_scalar(cur, "select sum(key) from t") == expected_sum
for sk in env.safekeepers:
wait_segment_offload(tenant_id, timeline_id, sk, seg_end)
@@ -484,8 +480,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, storage_type: str):
# require WAL to be trimmed, so no more than one segment is left on disk
wait_wal_trim(tenant_id, timeline_id, sk, 16 * 1.5)
cur.execute('SELECT pg_current_wal_flush_lsn()')
last_lsn = cur.fetchone()[0]
last_lsn = query_scalar(cur, 'SELECT pg_current_wal_flush_lsn()')
pageserver_lsn = env.pageserver.http_client().timeline_detail(
uuid.UUID(tenant_id), uuid.UUID((timeline_id)))["local"]["last_record_lsn"]
@@ -532,10 +527,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, storage_type: str):
# verify data
pg.create_start('test_s3_wal_replay')
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute("select sum(key) from t")
assert cur.fetchone()[0] == expected_sum
assert pg.safe_psql("select sum(key) from t")[0][0] == expected_sum
class ProposerPostgres(PgProtocol):
@@ -860,12 +852,10 @@ def test_replace_safekeeper(neon_env_builder: NeonEnvBuilder):
# as waiting for acceptors happens there
cur.execute('CREATE TABLE IF NOT EXISTS t(key int, value text)')
cur.execute("INSERT INTO t VALUES (0, 'something')")
cur.execute('SELECT SUM(key) FROM t')
sum_before = cur.fetchone()[0]
sum_before = query_scalar(cur, 'SELECT SUM(key) FROM t')
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
cur.execute('SELECT SUM(key) FROM t')
sum_after = cur.fetchone()[0]
sum_after = query_scalar(cur, 'SELECT SUM(key) FROM t')
assert sum_after == sum_before + 5000050000
def show_statuses(safekeepers: List[Safekeeper], tenant_id: str, timeline_id: str):
@@ -950,8 +940,7 @@ def test_wal_deleted_after_broadcast(neon_env_builder: NeonEnvBuilder):
assert pg.pgdata_dir is not None
log.info('executing INSERT to generate WAL')
cur.execute("select pg_current_wal_lsn()")
current_lsn = lsn_from_hex(cur.fetchone()[0]) / 1024 / 1024
current_lsn = lsn_from_hex(query_scalar(cur, "select pg_current_wal_lsn()")) / 1024 / 1024
pg_wal_size = get_dir_size(os.path.join(pg.pgdata_dir, 'pg_wal')) / 1024 / 1024
if enable_logs:
log.info(f"LSN delta: {current_lsn - last_lsn} MB, current WAL size: {pg_wal_size} MB")

View File

@@ -50,7 +50,6 @@ def test_pg_regress(neon_simple_env: NeonEnv, test_output_dir: pathlib.Path, pg_
# checkpoint one more time to ensure that the lsn we get is the latest one
pg.safe_psql('CHECKPOINT')
pg.safe_psql('select pg_current_wal_insert_lsn()')[0][0]
# Check that we restore the content of the datadir correctly
check_restored_datadir_content(test_output_dir, env, pg)

View File

@@ -70,6 +70,7 @@ class PgCompare(ABC):
for pg_stat in pg_stats:
cur.execute(pg_stat.query)
row = cur.fetchone()
assert row is not None
assert len(row) == len(pg_stat.columns)
for col, val in zip(pg_stat.columns, row):

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
from dataclasses import field
from contextlib import contextmanager
from enum import Flag, auto
import textwrap
from cached_property import cached_property
@@ -306,6 +307,15 @@ class PgProtocol:
conn.autocommit = autocommit
return conn
@contextmanager
def cursor(self, autocommit=True, **kwargs):
"""
Shorthand for pg.connect().cursor().
The cursor and connection are closed when the context is exited.
"""
with closing(self.connect(autocommit=autocommit, **kwargs)) as conn:
yield conn.cursor()
async def connect_async(self, **kwargs) -> asyncpg.Connection:
"""
Connect to the node from async python.
@@ -354,7 +364,7 @@ class PgProtocol:
if cur.description is None:
result.append([]) # query didn't return data
else:
result.append(cast(List[Any], cur.fetchall()))
result.append(cur.fetchall())
return result
@@ -2142,12 +2152,8 @@ def list_files_to_compare(pgdata_dir: pathlib.Path):
# pg is the existing and running compute node, that we want to compare with a basebackup
def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, pg: Postgres):
# Get the timeline ID. We need it for the 'basebackup' command
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute("SHOW neon.timeline_id")
timeline = cur.fetchone()[0]
timeline = pg.safe_psql("SHOW neon.timeline_id")[0][0]
# stop postgres to ensure that files won't change
pg.stop()

View File

@@ -6,6 +6,8 @@ import subprocess
from pathlib import Path
from typing import Any, List, Tuple
from psycopg2.extensions import cursor
from fixtures.log_helper import log
@@ -79,6 +81,20 @@ def etcd_path() -> Path:
return Path(path_output)
def query_scalar(cur: cursor, query: str) -> Any:
"""
It is a convenience wrapper to avoid repetitions
of cur.execute(); cur.fetchone()[0]
And this is mypy friendly, because without None
check mypy says that Optional is not indexable.
"""
cur.execute(query)
var = cur.fetchone()
assert var is not None
return var[0]
# Traverse directory to get total size.
def get_dir_size(path: str) -> int:
"""Return size in bytes."""

View File

@@ -9,6 +9,8 @@ import psycopg2.extras
import random
import time
from fixtures.utils import query_scalar
# This is a clear-box test that demonstrates the worst case scenario for the
# "1 segment per layer" implementation of the pageserver. It writes to random
@@ -59,9 +61,7 @@ def test_random_writes(neon_with_baseline: PgCompare):
rows_inserted += rows_to_insert
# Get table size (can't be predicted because padding and alignment)
cur.execute("SELECT pg_relation_size('Big');")
row = cur.fetchone()
table_size = row[0]
table_size = query_scalar(cur, "SELECT pg_relation_size('Big')")
env.zenbenchmark.record("table_size", table_size, 'bytes', MetricReport.TEST_PARAM)
# Decide how much to write, based on knowledge of pageserver implementation.

View File

@@ -34,6 +34,7 @@ def test_seqscans(neon_with_baseline: PgCompare, rows: int, iters: int, workers:
from pg_settings where name = 'shared_buffers'
''')
row = cur.fetchone()
assert row is not None
shared_buffers = row[0]
table_size = row[1]
log.info(f"shared_buffers is {shared_buffers}, table size {table_size}")