mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-15 04:00:38 +00:00
* Add --id argument to safekeeper setting its unique u64 id. In preparation for storage node messaging. IDs are supposed to be monotonically assigned by the console. In tests it is issued by ZenithEnv; at the zenith cli level and fixtures, string name is completely replaced by integer id. Example TOML configs are adjusted accordingly. Sequential ids are chosen over Zid mainly because they are compact and easy to type/remember. * add node id to pageserver This adds node id parameter to pageserver configuration. Also I use a simple builder to construct pageserver config struct to avoid setting node id to some temporary invalid value. Some of the changes in test fixtures are needed to split init and start operations for envrionment. Co-authored-by: Arseny Sher <sher-ars@yandex.ru>
155 lines
6.6 KiB
Python
155 lines
6.6 KiB
Python
from contextlib import closing, contextmanager
|
|
import psycopg2.extras
|
|
from fixtures.zenith_fixtures import ZenithEnvBuilder
|
|
from fixtures.log_helper import log
|
|
import os
|
|
import time
|
|
import asyncpg
|
|
from fixtures.zenith_fixtures import Postgres
|
|
import threading
|
|
|
|
pytest_plugins = ("fixtures.zenith_fixtures")
|
|
|
|
|
|
@contextmanager
|
|
def pg_cur(pg):
|
|
with closing(pg.connect()) as conn:
|
|
with conn.cursor() as cur:
|
|
yield cur
|
|
|
|
|
|
# Periodically check that all backpressure lags are below the configured threshold,
|
|
# assert if they are not.
|
|
# If the check query fails, stop the thread. Main thread should notice that and stop the test.
|
|
def check_backpressure(pg: Postgres, stop_event: threading.Event, polling_interval=5):
|
|
log.info("checks started")
|
|
|
|
with pg_cur(pg) as cur:
|
|
cur.execute("CREATE EXTENSION zenith") # TODO move it to zenith_fixtures?
|
|
|
|
cur.execute("select pg_size_bytes(current_setting('max_replication_write_lag'))")
|
|
res = cur.fetchone()
|
|
max_replication_write_lag_bytes = res[0]
|
|
log.info(f"max_replication_write_lag: {max_replication_write_lag_bytes} bytes")
|
|
|
|
cur.execute("select pg_size_bytes(current_setting('max_replication_flush_lag'))")
|
|
res = cur.fetchone()
|
|
max_replication_flush_lag_bytes = res[0]
|
|
log.info(f"max_replication_flush_lag: {max_replication_flush_lag_bytes} bytes")
|
|
|
|
cur.execute("select pg_size_bytes(current_setting('max_replication_apply_lag'))")
|
|
res = cur.fetchone()
|
|
max_replication_apply_lag_bytes = res[0]
|
|
log.info(f"max_replication_apply_lag: {max_replication_apply_lag_bytes} bytes")
|
|
|
|
with pg_cur(pg) as cur:
|
|
while not stop_event.is_set():
|
|
try:
|
|
cur.execute('''
|
|
select pg_wal_lsn_diff(pg_current_wal_flush_lsn(),received_lsn) as received_lsn_lag,
|
|
pg_wal_lsn_diff(pg_current_wal_flush_lsn(),disk_consistent_lsn) as disk_consistent_lsn_lag,
|
|
pg_wal_lsn_diff(pg_current_wal_flush_lsn(),remote_consistent_lsn) as remote_consistent_lsn_lag,
|
|
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_flush_lsn(),received_lsn)),
|
|
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_flush_lsn(),disk_consistent_lsn)),
|
|
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_flush_lsn(),remote_consistent_lsn))
|
|
from backpressure_lsns();
|
|
''')
|
|
|
|
res = cur.fetchone()
|
|
received_lsn_lag = res[0]
|
|
disk_consistent_lsn_lag = res[1]
|
|
remote_consistent_lsn_lag = res[2]
|
|
|
|
log.info(f"received_lsn_lag = {received_lsn_lag} ({res[3]}), "
|
|
f"disk_consistent_lsn_lag = {disk_consistent_lsn_lag} ({res[4]}), "
|
|
f"remote_consistent_lsn_lag = {remote_consistent_lsn_lag} ({res[5]})")
|
|
|
|
# Since feedback from pageserver is not immediate, we should allow some lag overflow
|
|
lag_overflow = 5 * 1024 * 1024 # 5MB
|
|
|
|
if max_replication_write_lag_bytes > 0:
|
|
assert received_lsn_lag < max_replication_write_lag_bytes + lag_overflow
|
|
if max_replication_flush_lag_bytes > 0:
|
|
assert disk_consistent_lsn_lag < max_replication_flush_lag_bytes + lag_overflow
|
|
if max_replication_apply_lag_bytes > 0:
|
|
assert remote_consistent_lsn_lag < max_replication_apply_lag_bytes + lag_overflow
|
|
|
|
time.sleep(polling_interval)
|
|
|
|
except Exception as e:
|
|
log.info(f"backpressure check query failed: {e}")
|
|
stop_event.set()
|
|
|
|
log.info('check thread stopped')
|
|
|
|
|
|
# This test illustrates how to tune backpressure to control the lag
|
|
# between the WAL flushed on compute node and WAL digested by pageserver.
|
|
#
|
|
# To test it, throttle walreceiver ingest using failpoint and run heavy write load.
|
|
# If backpressure is disabled or not tuned properly, the query will timeout, because the walreceiver cannot keep up.
|
|
# If backpressure is enabled and tuned properly, insertion will be throttled, but the query will not timeout.
|
|
|
|
|
|
def test_backpressure_received_lsn_lag(zenith_env_builder: ZenithEnvBuilder):
|
|
zenith_env_builder.num_safekeepers = 1
|
|
env = zenith_env_builder.init_start()
|
|
# Create a branch for us
|
|
env.zenith_cli.create_branch("test_backpressure", "main")
|
|
|
|
pg = env.postgres.create_start('test_backpressure',
|
|
config_lines=['max_replication_write_lag=30MB'])
|
|
log.info("postgres is running on 'test_backpressure' branch")
|
|
|
|
# setup check thread
|
|
check_stop_event = threading.Event()
|
|
check_thread = threading.Thread(target=check_backpressure, args=(pg, check_stop_event))
|
|
check_thread.start()
|
|
|
|
# Configure failpoint to slow down walreceiver ingest
|
|
with closing(env.pageserver.connect()) as psconn:
|
|
with psconn.cursor(cursor_factory=psycopg2.extras.DictCursor) as pscur:
|
|
pscur.execute("failpoints walreceiver-after-ingest=sleep(20)")
|
|
|
|
# FIXME
|
|
# Wait for the check thread to start
|
|
#
|
|
# Now if load starts too soon,
|
|
# check thread cannot auth, because it is not able to connect to the database
|
|
# because of the lag and waiting for lsn to replay to arrive.
|
|
time.sleep(2)
|
|
|
|
with pg_cur(pg) as cur:
|
|
# Create and initialize test table
|
|
cur.execute("CREATE TABLE foo(x bigint)")
|
|
|
|
inserts_to_do = 2000000
|
|
rows_inserted = 0
|
|
|
|
while check_thread.is_alive() and rows_inserted < inserts_to_do:
|
|
try:
|
|
cur.execute("INSERT INTO foo select from generate_series(1, 100000)")
|
|
rows_inserted += 100000
|
|
except Exception as e:
|
|
if check_thread.is_alive():
|
|
log.info('stopping check thread')
|
|
check_stop_event.set()
|
|
check_thread.join()
|
|
assert False, f"Exception {e} while inserting rows, but WAL lag is within configured threshold. That means backpressure is not tuned properly"
|
|
else:
|
|
assert False, f"Exception {e} while inserting rows and WAL lag overflowed configured threshold. That means backpressure doesn't work."
|
|
|
|
log.info(f"inserted {rows_inserted} rows")
|
|
|
|
if check_thread.is_alive():
|
|
log.info('stopping check thread')
|
|
check_stop_event.set()
|
|
check_thread.join()
|
|
log.info('check thread stopped')
|
|
else:
|
|
assert False, "WAL lag overflowed configured threshold. That means backpressure doesn't work."
|
|
|
|
|
|
#TODO test_backpressure_disk_consistent_lsn_lag. Play with pageserver's checkpoint settings
|
|
#TODO test_backpressure_remote_consistent_lsn_lag
|