diff --git a/test_runner/batch_others/test_backpressure.py b/test_runner/batch_others/test_backpressure.py new file mode 100644 index 0000000000..23af5b90ed --- /dev/null +++ b/test_runner/batch_others/test_backpressure.py @@ -0,0 +1,154 @@ +from contextlib import closing, contextmanager +import psycopg2.extras +from fixtures.zenith_fixtures import ZenithEnvBuilder +from fixtures.log_helper import log +import os +import time +import asyncpg +from fixtures.zenith_fixtures import Postgres +import threading + +pytest_plugins = ("fixtures.zenith_fixtures") + + +@contextmanager +def pg_cur(pg): + with closing(pg.connect()) as conn: + with conn.cursor() as cur: + yield cur + + +# Periodically check that all backpressure lags are below the configured threshold, +# assert if they are not. +# If the check query fails, stop the thread. Main thread should notice that and stop the test. +def check_backpressure(pg: Postgres, stop_event: threading.Event, polling_interval=5): + log.info("checks started") + + with pg_cur(pg) as cur: + cur.execute("CREATE EXTENSION zenith") # TODO move it to zenith_fixtures? + + cur.execute("select pg_size_bytes(current_setting('max_replication_write_lag'))") + res = cur.fetchone() + max_replication_write_lag_bytes = res[0] + log.info(f"max_replication_write_lag: {max_replication_write_lag_bytes} bytes") + + cur.execute("select pg_size_bytes(current_setting('max_replication_flush_lag'))") + res = cur.fetchone() + max_replication_flush_lag_bytes = res[0] + log.info(f"max_replication_flush_lag: {max_replication_flush_lag_bytes} bytes") + + cur.execute("select pg_size_bytes(current_setting('max_replication_apply_lag'))") + res = cur.fetchone() + max_replication_apply_lag_bytes = res[0] + log.info(f"max_replication_apply_lag: {max_replication_apply_lag_bytes} bytes") + + with pg_cur(pg) as cur: + while not stop_event.is_set(): + try: + cur.execute(''' + select pg_wal_lsn_diff(pg_current_wal_flush_lsn(),received_lsn) as received_lsn_lag, + pg_wal_lsn_diff(pg_current_wal_flush_lsn(),disk_consistent_lsn) as disk_consistent_lsn_lag, + pg_wal_lsn_diff(pg_current_wal_flush_lsn(),remote_consistent_lsn) as remote_consistent_lsn_lag, + pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_flush_lsn(),received_lsn)), + pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_flush_lsn(),disk_consistent_lsn)), + pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_flush_lsn(),remote_consistent_lsn)) + from backpressure_lsns(); + ''') + + res = cur.fetchone() + received_lsn_lag = res[0] + disk_consistent_lsn_lag = res[1] + remote_consistent_lsn_lag = res[2] + + log.info(f"received_lsn_lag = {received_lsn_lag} ({res[3]}), " + f"disk_consistent_lsn_lag = {disk_consistent_lsn_lag} ({res[4]}), " + f"remote_consistent_lsn_lag = {remote_consistent_lsn_lag} ({res[5]})") + + # Since feedback from pageserver is not immediate, we should allow some lag overflow + lag_overflow = 5 * 1024 * 1024 # 5MB + + if max_replication_write_lag_bytes > 0: + assert received_lsn_lag < max_replication_write_lag_bytes + lag_overflow + if max_replication_flush_lag_bytes > 0: + assert disk_consistent_lsn_lag < max_replication_flush_lag_bytes + lag_overflow + if max_replication_apply_lag_bytes > 0: + assert remote_consistent_lsn_lag < max_replication_apply_lag_bytes + lag_overflow + + time.sleep(polling_interval) + + except Exception as e: + log.info(f"backpressure check query failed: {e}") + stop_event.set() + + log.info('check thread stopped') + + +# This test illustrates how to tune backpressure to control the lag +# between the WAL flushed on compute node and WAL digested by pageserver. +# +# To test it, throttle walreceiver ingest using failpoint and run heavy write load. +# If backpressure is disabled or not tuned properly, the query will timeout, because the walreceiver cannot keep up. +# If backpressure is enabled and tuned properly, insertion will be throttled, but the query will not timeout. + + +def test_backpressure_received_lsn_lag(zenith_env_builder: ZenithEnvBuilder): + zenith_env_builder.num_safekeepers = 1 + env = zenith_env_builder.init() + # Create a branch for us + env.zenith_cli.create_branch("test_backpressure", "main") + + pg = env.postgres.create_start('test_backpressure', + config_lines=['max_replication_write_lag=30MB']) + log.info("postgres is running on 'test_backpressure' branch") + + # setup check thread + check_stop_event = threading.Event() + check_thread = threading.Thread(target=check_backpressure, args=(pg, check_stop_event)) + check_thread.start() + + # Configure failpoint to slow down walreceiver ingest + with closing(env.pageserver.connect()) as psconn: + with psconn.cursor(cursor_factory=psycopg2.extras.DictCursor) as pscur: + pscur.execute("failpoints walreceiver-after-ingest=sleep(20)") + + # FIXME + # Wait for the check thread to start + # + # Now if load starts too soon, + # check thread cannot auth, because it is not able to connect to the database + # because of the lag and waiting for lsn to replay to arrive. + time.sleep(2) + + with pg_cur(pg) as cur: + # Create and initialize test table + cur.execute("CREATE TABLE foo(x bigint)") + + inserts_to_do = 2000000 + rows_inserted = 0 + + while check_thread.is_alive() and rows_inserted < inserts_to_do: + try: + cur.execute("INSERT INTO foo select from generate_series(1, 100000)") + rows_inserted += 100000 + except Exception as e: + if check_thread.is_alive(): + log.info('stopping check thread') + check_stop_event.set() + check_thread.join() + assert False, f"Exception {e} while inserting rows, but WAL lag is within configured threshold. That means backpressure is not tuned properly" + else: + assert False, f"Exception {e} while inserting rows and WAL lag overflowed configured threshold. That means backpressure doesn't work." + + log.info(f"inserted {rows_inserted} rows") + + if check_thread.is_alive(): + log.info('stopping check thread') + check_stop_event.set() + check_thread.join() + log.info('check thread stopped') + else: + assert False, "WAL lag overflowed configured threshold. That means backpressure doesn't work." + + +#TODO test_backpressure_disk_consistent_lsn_lag. Play with pageserver's checkpoint settings +#TODO test_backpressure_remote_consistent_lsn_lag diff --git a/test_runner/batch_others/test_timeline_size.py b/test_runner/batch_others/test_timeline_size.py index b187c662b7..b48f830528 100644 --- a/test_runner/batch_others/test_timeline_size.py +++ b/test_runner/batch_others/test_timeline_size.py @@ -39,28 +39,28 @@ def test_timeline_size(zenith_simple_env: ZenithEnv): assert res["current_logical_size"] == res["current_logical_size_non_incremental"] -# wait until write_lag is 0 +# wait until received_lsn_lag is 0 def wait_for_pageserver_catchup(pgmain: Postgres, polling_interval=1, timeout=60): started_at = time.time() - write_lag = 1 - while write_lag > 0: + received_lsn_lag = 1 + while received_lsn_lag > 0: elapsed = time.time() - started_at if elapsed > timeout: - raise RuntimeError(f"timed out waiting for pageserver to reach pg_current_wal_lsn()") + raise RuntimeError( + f"timed out waiting for pageserver to reach pg_current_wal_flush_lsn()") with closing(pgmain.connect()) as conn: with conn.cursor() as cur: + cur.execute(''' select pg_size_pretty(pg_cluster_size()), - pg_wal_lsn_diff(pg_current_wal_lsn(),write_lsn) as write_lag, - pg_wal_lsn_diff(pg_current_wal_lsn(),sent_lsn) as pending_lag - FROM pg_stat_get_wal_senders(); + pg_wal_lsn_diff(pg_current_wal_flush_lsn(),received_lsn) as received_lsn_lag + FROM backpressure_lsns(); ''') res = cur.fetchone() - log.info( - f"pg_cluster_size = {res[0]}, write_lag = {res[1]}, pending_lag = {res[2]}") - write_lag = res[1] + log.info(f"pg_cluster_size = {res[0]}, received_lsn_lag = {res[1]}") + received_lsn_lag = res[1] time.sleep(polling_interval) diff --git a/vendor/postgres b/vendor/postgres index d914790e6c..a3709cc364 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit d914790e6c2070dec2e35bd3135bb46acb510bf9 +Subproject commit a3709cc3643dd28c30b2b8f603ba3d60a586afb9