mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-04 03:52:56 +00:00
Fix zenith feedback processing at compute node.
Add test for backpressure
This commit is contained in:
committed by
Anastasia Lubennikova
parent
1a4682a04a
commit
74a0942a77
154
test_runner/batch_others/test_backpressure.py
Normal file
154
test_runner/batch_others/test_backpressure.py
Normal file
@@ -0,0 +1,154 @@
|
||||
from contextlib import closing, contextmanager
|
||||
import psycopg2.extras
|
||||
from fixtures.zenith_fixtures import ZenithEnvBuilder
|
||||
from fixtures.log_helper import log
|
||||
import os
|
||||
import time
|
||||
import asyncpg
|
||||
from fixtures.zenith_fixtures import Postgres
|
||||
import threading
|
||||
|
||||
pytest_plugins = ("fixtures.zenith_fixtures")
|
||||
|
||||
|
||||
@contextmanager
|
||||
def pg_cur(pg):
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
yield cur
|
||||
|
||||
|
||||
# Periodically check that all backpressure lags are below the configured threshold,
|
||||
# assert if they are not.
|
||||
# If the check query fails, stop the thread. Main thread should notice that and stop the test.
|
||||
def check_backpressure(pg: Postgres, stop_event: threading.Event, polling_interval=5):
|
||||
log.info("checks started")
|
||||
|
||||
with pg_cur(pg) as cur:
|
||||
cur.execute("CREATE EXTENSION zenith") # TODO move it to zenith_fixtures?
|
||||
|
||||
cur.execute("select pg_size_bytes(current_setting('max_replication_write_lag'))")
|
||||
res = cur.fetchone()
|
||||
max_replication_write_lag_bytes = res[0]
|
||||
log.info(f"max_replication_write_lag: {max_replication_write_lag_bytes} bytes")
|
||||
|
||||
cur.execute("select pg_size_bytes(current_setting('max_replication_flush_lag'))")
|
||||
res = cur.fetchone()
|
||||
max_replication_flush_lag_bytes = res[0]
|
||||
log.info(f"max_replication_flush_lag: {max_replication_flush_lag_bytes} bytes")
|
||||
|
||||
cur.execute("select pg_size_bytes(current_setting('max_replication_apply_lag'))")
|
||||
res = cur.fetchone()
|
||||
max_replication_apply_lag_bytes = res[0]
|
||||
log.info(f"max_replication_apply_lag: {max_replication_apply_lag_bytes} bytes")
|
||||
|
||||
with pg_cur(pg) as cur:
|
||||
while not stop_event.is_set():
|
||||
try:
|
||||
cur.execute('''
|
||||
select pg_wal_lsn_diff(pg_current_wal_flush_lsn(),received_lsn) as received_lsn_lag,
|
||||
pg_wal_lsn_diff(pg_current_wal_flush_lsn(),disk_consistent_lsn) as disk_consistent_lsn_lag,
|
||||
pg_wal_lsn_diff(pg_current_wal_flush_lsn(),remote_consistent_lsn) as remote_consistent_lsn_lag,
|
||||
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_flush_lsn(),received_lsn)),
|
||||
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_flush_lsn(),disk_consistent_lsn)),
|
||||
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_flush_lsn(),remote_consistent_lsn))
|
||||
from backpressure_lsns();
|
||||
''')
|
||||
|
||||
res = cur.fetchone()
|
||||
received_lsn_lag = res[0]
|
||||
disk_consistent_lsn_lag = res[1]
|
||||
remote_consistent_lsn_lag = res[2]
|
||||
|
||||
log.info(f"received_lsn_lag = {received_lsn_lag} ({res[3]}), "
|
||||
f"disk_consistent_lsn_lag = {disk_consistent_lsn_lag} ({res[4]}), "
|
||||
f"remote_consistent_lsn_lag = {remote_consistent_lsn_lag} ({res[5]})")
|
||||
|
||||
# Since feedback from pageserver is not immediate, we should allow some lag overflow
|
||||
lag_overflow = 5 * 1024 * 1024 # 5MB
|
||||
|
||||
if max_replication_write_lag_bytes > 0:
|
||||
assert received_lsn_lag < max_replication_write_lag_bytes + lag_overflow
|
||||
if max_replication_flush_lag_bytes > 0:
|
||||
assert disk_consistent_lsn_lag < max_replication_flush_lag_bytes + lag_overflow
|
||||
if max_replication_apply_lag_bytes > 0:
|
||||
assert remote_consistent_lsn_lag < max_replication_apply_lag_bytes + lag_overflow
|
||||
|
||||
time.sleep(polling_interval)
|
||||
|
||||
except Exception as e:
|
||||
log.info(f"backpressure check query failed: {e}")
|
||||
stop_event.set()
|
||||
|
||||
log.info('check thread stopped')
|
||||
|
||||
|
||||
# This test illustrates how to tune backpressure to control the lag
|
||||
# between the WAL flushed on compute node and WAL digested by pageserver.
|
||||
#
|
||||
# To test it, throttle walreceiver ingest using failpoint and run heavy write load.
|
||||
# If backpressure is disabled or not tuned properly, the query will timeout, because the walreceiver cannot keep up.
|
||||
# If backpressure is enabled and tuned properly, insertion will be throttled, but the query will not timeout.
|
||||
|
||||
|
||||
def test_backpressure_received_lsn_lag(zenith_env_builder: ZenithEnvBuilder):
|
||||
zenith_env_builder.num_safekeepers = 1
|
||||
env = zenith_env_builder.init()
|
||||
# Create a branch for us
|
||||
env.zenith_cli.create_branch("test_backpressure", "main")
|
||||
|
||||
pg = env.postgres.create_start('test_backpressure',
|
||||
config_lines=['max_replication_write_lag=30MB'])
|
||||
log.info("postgres is running on 'test_backpressure' branch")
|
||||
|
||||
# setup check thread
|
||||
check_stop_event = threading.Event()
|
||||
check_thread = threading.Thread(target=check_backpressure, args=(pg, check_stop_event))
|
||||
check_thread.start()
|
||||
|
||||
# Configure failpoint to slow down walreceiver ingest
|
||||
with closing(env.pageserver.connect()) as psconn:
|
||||
with psconn.cursor(cursor_factory=psycopg2.extras.DictCursor) as pscur:
|
||||
pscur.execute("failpoints walreceiver-after-ingest=sleep(20)")
|
||||
|
||||
# FIXME
|
||||
# Wait for the check thread to start
|
||||
#
|
||||
# Now if load starts too soon,
|
||||
# check thread cannot auth, because it is not able to connect to the database
|
||||
# because of the lag and waiting for lsn to replay to arrive.
|
||||
time.sleep(2)
|
||||
|
||||
with pg_cur(pg) as cur:
|
||||
# Create and initialize test table
|
||||
cur.execute("CREATE TABLE foo(x bigint)")
|
||||
|
||||
inserts_to_do = 2000000
|
||||
rows_inserted = 0
|
||||
|
||||
while check_thread.is_alive() and rows_inserted < inserts_to_do:
|
||||
try:
|
||||
cur.execute("INSERT INTO foo select from generate_series(1, 100000)")
|
||||
rows_inserted += 100000
|
||||
except Exception as e:
|
||||
if check_thread.is_alive():
|
||||
log.info('stopping check thread')
|
||||
check_stop_event.set()
|
||||
check_thread.join()
|
||||
assert False, f"Exception {e} while inserting rows, but WAL lag is within configured threshold. That means backpressure is not tuned properly"
|
||||
else:
|
||||
assert False, f"Exception {e} while inserting rows and WAL lag overflowed configured threshold. That means backpressure doesn't work."
|
||||
|
||||
log.info(f"inserted {rows_inserted} rows")
|
||||
|
||||
if check_thread.is_alive():
|
||||
log.info('stopping check thread')
|
||||
check_stop_event.set()
|
||||
check_thread.join()
|
||||
log.info('check thread stopped')
|
||||
else:
|
||||
assert False, "WAL lag overflowed configured threshold. That means backpressure doesn't work."
|
||||
|
||||
|
||||
#TODO test_backpressure_disk_consistent_lsn_lag. Play with pageserver's checkpoint settings
|
||||
#TODO test_backpressure_remote_consistent_lsn_lag
|
||||
@@ -39,28 +39,28 @@ def test_timeline_size(zenith_simple_env: ZenithEnv):
|
||||
assert res["current_logical_size"] == res["current_logical_size_non_incremental"]
|
||||
|
||||
|
||||
# wait until write_lag is 0
|
||||
# wait until received_lsn_lag is 0
|
||||
def wait_for_pageserver_catchup(pgmain: Postgres, polling_interval=1, timeout=60):
|
||||
started_at = time.time()
|
||||
|
||||
write_lag = 1
|
||||
while write_lag > 0:
|
||||
received_lsn_lag = 1
|
||||
while received_lsn_lag > 0:
|
||||
elapsed = time.time() - started_at
|
||||
if elapsed > timeout:
|
||||
raise RuntimeError(f"timed out waiting for pageserver to reach pg_current_wal_lsn()")
|
||||
raise RuntimeError(
|
||||
f"timed out waiting for pageserver to reach pg_current_wal_flush_lsn()")
|
||||
|
||||
with closing(pgmain.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
|
||||
cur.execute('''
|
||||
select pg_size_pretty(pg_cluster_size()),
|
||||
pg_wal_lsn_diff(pg_current_wal_lsn(),write_lsn) as write_lag,
|
||||
pg_wal_lsn_diff(pg_current_wal_lsn(),sent_lsn) as pending_lag
|
||||
FROM pg_stat_get_wal_senders();
|
||||
pg_wal_lsn_diff(pg_current_wal_flush_lsn(),received_lsn) as received_lsn_lag
|
||||
FROM backpressure_lsns();
|
||||
''')
|
||||
res = cur.fetchone()
|
||||
log.info(
|
||||
f"pg_cluster_size = {res[0]}, write_lag = {res[1]}, pending_lag = {res[2]}")
|
||||
write_lag = res[1]
|
||||
log.info(f"pg_cluster_size = {res[0]}, received_lsn_lag = {res[1]}")
|
||||
received_lsn_lag = res[1]
|
||||
|
||||
time.sleep(polling_interval)
|
||||
|
||||
|
||||
2
vendor/postgres
vendored
2
vendor/postgres
vendored
Submodule vendor/postgres updated: d914790e6c...a3709cc364
Reference in New Issue
Block a user