mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
## Problem See https://neondb.slack.com/archives/C03QLRH7PPD/p1723038557449239?thread_ts=1722868375.476789&cid=C03QLRH7PPD Logical replication subscription by default use `synchronous_commit=off` which cause problems with safekeeper ## Summary of changes Set `synchronous_commit=on` for logical replication subscription in test_subscriber_restart.py ## Checklist before requesting a review - [ ] I have performed a self-review of my code. - [ ] If it is a core feature, I have added thorough tests. - [ ] Do we need to implement analytics? if so did you add the relevant metrics to the dashboard? - [ ] If this PR requires public announcement, mark it with /release-notes label and add several sentences in this section. ## Checklist before merging - [ ] Do not forget to reformat commit message to not include the above checklist --------- Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech> Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
60 lines
2.2 KiB
Python
60 lines
2.2 KiB
Python
import threading
|
|
import time
|
|
|
|
from fixtures.neon_fixtures import NeonEnv
|
|
from fixtures.utils import wait_until
|
|
|
|
|
|
# This test checks of logical replication subscriber is able to correctly restart replication without receiving duplicates.
|
|
# It requires tracking information about replication origins at page server side
|
|
def test_subscriber_restart(neon_simple_env: NeonEnv):
|
|
env = neon_simple_env
|
|
env.neon_cli.create_branch("publisher")
|
|
pub = env.endpoints.create("publisher")
|
|
pub.start()
|
|
|
|
env.neon_cli.create_branch("subscriber")
|
|
sub = env.endpoints.create("subscriber")
|
|
sub.start()
|
|
|
|
n_records = 100000
|
|
n_restarts = 100
|
|
|
|
def check_that_changes_propagated():
|
|
scur.execute("SELECT count(*) FROM t")
|
|
res = scur.fetchall()
|
|
assert res[0][0] == n_records
|
|
|
|
def insert_data(pub):
|
|
with pub.cursor() as pcur:
|
|
for i in range(0, n_records):
|
|
pcur.execute("INSERT into t values (%s,random()*100000)", (i,))
|
|
|
|
with pub.cursor() as pcur:
|
|
with sub.cursor() as scur:
|
|
pcur.execute("CREATE TABLE t (pk integer primary key, sk integer)")
|
|
pcur.execute("CREATE PUBLICATION pub FOR TABLE t")
|
|
scur.execute("CREATE TABLE t (pk integer primary key, sk integer)")
|
|
# scur.execute("CREATE INDEX on t(sk)") # slowdown applying WAL at replica
|
|
pub_conn = f"host=localhost port={pub.pg_port} dbname=postgres user=cloud_admin"
|
|
# synchronous_commit=on to test a hypothesis for why this test has been flaky.
|
|
# XXX: Add link to the issue
|
|
query = f"CREATE SUBSCRIPTION sub CONNECTION '{pub_conn}' PUBLICATION pub with (synchronous_commit=on)"
|
|
scur.execute(query)
|
|
time.sleep(2) # let initial table sync complete
|
|
|
|
thread = threading.Thread(target=insert_data, args=(pub,), daemon=True)
|
|
thread.start()
|
|
|
|
for _ in range(n_restarts):
|
|
# restart subscriber
|
|
# time.sleep(2)
|
|
sub.stop("immediate")
|
|
sub.start()
|
|
|
|
thread.join()
|
|
pcur.execute(f"INSERT into t values ({n_records}, 0)")
|
|
n_records += 1
|
|
with sub.cursor() as scur:
|
|
wait_until(60, 0.5, check_that_changes_propagated)
|