from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnv from fixtures.pg_version import PgVersion from fixtures.utils import wait_until def test_neon_superuser(neon_simple_env: NeonEnv, pg_version: PgVersion): env = neon_simple_env env.neon_cli.create_branch("test_neon_superuser_publisher", "empty") pub = env.endpoints.create("test_neon_superuser_publisher") env.neon_cli.create_branch("test_neon_superuser_subscriber") sub = env.endpoints.create("test_neon_superuser_subscriber") pub.respec(skip_pg_catalog_updates=False) pub.start() sub.respec(skip_pg_catalog_updates=False) sub.start() pub.wait_for_migrations() sub.wait_for_migrations() with pub.cursor() as cur: cur.execute( "CREATE ROLE mr_whiskers WITH PASSWORD 'cat' LOGIN INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser" ) cur.execute("CREATE DATABASE neondb WITH OWNER mr_whiskers") cur.execute("GRANT ALL PRIVILEGES ON DATABASE neondb TO neon_superuser") # If we don't do this, creating the subscription will fail later on PG16 pub.edit_hba(["host all mr_whiskers 0.0.0.0/0 md5"]) with sub.cursor() as cur: cur.execute( "CREATE ROLE mr_whiskers WITH PASSWORD 'cat' LOGIN INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser" ) cur.execute("CREATE DATABASE neondb WITH OWNER mr_whiskers") cur.execute("GRANT ALL PRIVILEGES ON DATABASE neondb TO neon_superuser") with pub.cursor(dbname="neondb", user="mr_whiskers", password="cat") as cur: cur.execute("SELECT pg_has_role('mr_whiskers', 'neon_superuser', 'member')") assert cur.fetchall()[0][0] cur.execute("SELECT pg_has_role('mr_whiskers', 'neon_superuser', 'usage')") assert cur.fetchall()[0][0] if pg_version == PgVersion.V16: cur.execute("SELECT pg_has_role('mr_whiskers', 'neon_superuser', 'set')") assert cur.fetchall()[0][0] cur.execute("CREATE PUBLICATION pub FOR ALL TABLES") cur.execute("CREATE ROLE definitely_not_a_superuser WITH PASSWORD 'nope'") cur.execute("CREATE DATABASE definitely_a_database") cur.execute("CREATE TABLE t (a int)") cur.execute("INSERT INTO t VALUES (10), (20)") cur.execute("SELECT * from t") res = cur.fetchall() assert [r[0] for r in res] == [10, 20] with sub.cursor(dbname="neondb", user="mr_whiskers", password="cat") as cur: cur.execute("CREATE TABLE t (a int)") pub_conn = f"host=localhost port={pub.pg_port} dbname=neondb user=mr_whiskers password=cat" query = f"CREATE SUBSCRIPTION sub CONNECTION '{pub_conn}' PUBLICATION pub" log.info(f"Creating subscription: {query}") cur.execute(query) with pub.cursor(dbname="neondb", user="mr_whiskers", password="cat") as pcur: pcur.execute("INSERT INTO t VALUES (30), (40)") def check_that_changes_propagated(): cur.execute("SELECT * FROM t") res = cur.fetchall() log.info(res) assert len(res) == 4 assert [r[0] for r in res] == [10, 20, 30, 40] wait_until(10, 0.5, check_that_changes_propagated)