From 29539b056100c7c0b3574ec13789ef91e9d748d9 Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Wed, 27 Apr 2022 19:09:28 +0300 Subject: [PATCH] Set wal_keep_size to zero (#1507) wal_keep_size is already set to 0 in our cloud setup, but we don't use this value in tests. This commit fixes wal_keep_size in control_plane and adds tests for WAL recycling and lagging safekeepers. --- control_plane/src/compute.rs | 7 +-- test_runner/batch_others/test_wal_acceptor.py | 55 ++++++++++++++++++- .../batch_others/test_wal_acceptor_async.py | 37 ++++++++++--- test_runner/fixtures/utils.py | 11 ++++ 4 files changed, 95 insertions(+), 15 deletions(-) diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index 2549baca5d..92d0e080d8 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -273,12 +273,7 @@ impl PostgresNode { conf.append("wal_sender_timeout", "5s"); conf.append("listen_addresses", &self.address.ip().to_string()); conf.append("port", &self.address.port().to_string()); - - // Never clean up old WAL. TODO: We should use a replication - // slot or something proper, to prevent the compute node - // from removing WAL that hasn't been streamed to the safekeeper or - // page server yet. (gh issue #349) - conf.append("wal_keep_size", "10TB"); + conf.append("wal_keep_size", "0"); // Configure the node to fetch pages from pageserver let pageserver_connstr = { diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py index 395084af0e..94059e2a4c 100644 --- a/test_runner/batch_others/test_wal_acceptor.py +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -13,7 +13,7 @@ from dataclasses import dataclass, field from multiprocessing import Process, Value from pathlib import Path from fixtures.zenith_fixtures import PgBin, Postgres, Safekeeper, ZenithEnv, ZenithEnvBuilder, PortDistributor, SafekeeperPort, zenith_binpath, PgProtocol -from fixtures.utils import etcd_path, lsn_to_hex, mkdir_if_needed, lsn_from_hex +from fixtures.utils import etcd_path, get_dir_size, lsn_to_hex, mkdir_if_needed, lsn_from_hex from fixtures.log_helper import log from typing import List, Optional, Any @@ -791,3 +791,56 @@ def test_replace_safekeeper(zenith_env_builder: ZenithEnvBuilder): env.safekeepers[1].stop(immediate=True) execute_payload(pg) show_statuses(env.safekeepers, tenant_id, timeline_id) + + +# We have `wal_keep_size=0`, so postgres should trim WAL once it's broadcasted +# to all safekeepers. This test checks that compute WAL can fit into small number +# of WAL segments. +def test_wal_deleted_after_broadcast(zenith_env_builder: ZenithEnvBuilder): + # used to calculate delta in collect_stats + last_lsn = .0 + + # returns LSN and pg_wal size, all in MB + def collect_stats(pg: Postgres, cur, enable_logs=True): + nonlocal last_lsn + assert pg.pgdata_dir is not None + + log.info('executing INSERT to generate WAL') + cur.execute("select pg_current_wal_lsn()") + current_lsn = lsn_from_hex(cur.fetchone()[0]) / 1024 / 1024 + pg_wal_size = get_dir_size(os.path.join(pg.pgdata_dir, 'pg_wal')) / 1024 / 1024 + if enable_logs: + log.info(f"LSN delta: {current_lsn - last_lsn} MB, current WAL size: {pg_wal_size} MB") + last_lsn = current_lsn + return current_lsn, pg_wal_size + + # generates about ~20MB of WAL, to create at least one new segment + def generate_wal(cur): + cur.execute("INSERT INTO t SELECT generate_series(1,300000), 'payload'") + + zenith_env_builder.num_safekeepers = 3 + env = zenith_env_builder.init_start() + + env.zenith_cli.create_branch('test_wal_deleted_after_broadcast') + # Adjust checkpoint config to prevent keeping old WAL segments + pg = env.postgres.create_start( + 'test_wal_deleted_after_broadcast', + config_lines=['min_wal_size=32MB', 'max_wal_size=32MB', 'log_checkpoints=on']) + + pg_conn = pg.connect() + cur = pg_conn.cursor() + cur.execute('CREATE TABLE t(key int, value text)') + + collect_stats(pg, cur) + + # generate WAL to simulate normal workload + for i in range(5): + generate_wal(cur) + collect_stats(pg, cur) + + log.info('executing checkpoint') + cur.execute('CHECKPOINT') + wal_size_after_checkpoint = collect_stats(pg, cur)[1] + + # there shouldn't be more than 2 WAL segments (but dir may have archive_status files) + assert wal_size_after_checkpoint < 16 * 2.5 diff --git a/test_runner/batch_others/test_wal_acceptor_async.py b/test_runner/batch_others/test_wal_acceptor_async.py index e3df8ea3eb..c484b6401c 100644 --- a/test_runner/batch_others/test_wal_acceptor_async.py +++ b/test_runner/batch_others/test_wal_acceptor_async.py @@ -139,13 +139,12 @@ async def wait_for_lsn(safekeeper: Safekeeper, async def run_restarts_under_load(env: ZenithEnv, pg: Postgres, acceptors: List[Safekeeper], - n_workers=10): - n_accounts = 100 - init_amount = 100000 - max_transfer = 100 - period_time = 4 - iterations = 10 - + n_workers=10, + n_accounts=100, + init_amount=100000, + max_transfer=100, + period_time=4, + iterations=10): # Set timeout for this test at 5 minutes. It should be enough for test to complete # and less than CircleCI's no_output_timeout, taking into account that this timeout # is checked only at the beginning of every iteration. @@ -202,7 +201,7 @@ async def run_restarts_under_load(env: ZenithEnv, await pg_conn.close() -# restart acceptors one by one, while executing and validating bank transactions +# Restart acceptors one by one, while executing and validating bank transactions def test_restarts_under_load(zenith_env_builder: ZenithEnvBuilder): zenith_env_builder.num_safekeepers = 3 env = zenith_env_builder.init_start() @@ -213,3 +212,25 @@ def test_restarts_under_load(zenith_env_builder: ZenithEnvBuilder): config_lines=['max_replication_write_lag=1MB']) asyncio.run(run_restarts_under_load(env, pg, env.safekeepers)) + + +# Restart acceptors one by one and test that everything is working as expected +# when checkpoins are triggered frequently by max_wal_size=32MB. Because we have +# wal_keep_size=0, there will be aggressive WAL segments recycling. +def test_restarts_frequent_checkpoints(zenith_env_builder: ZenithEnvBuilder): + zenith_env_builder.num_safekeepers = 3 + env = zenith_env_builder.init_start() + + env.zenith_cli.create_branch('test_restarts_frequent_checkpoints') + # Enable backpressure with 1MB maximal lag, because we don't want to block on `wait_for_lsn()` for too long + pg = env.postgres.create_start('test_restarts_frequent_checkpoints', + config_lines=[ + 'max_replication_write_lag=1MB', + 'min_wal_size=32MB', + 'max_wal_size=32MB', + 'log_checkpoints=on' + ]) + + # we try to simulate large (flush_lsn - truncate_lsn) lag, to test that WAL segments + # are not removed before broadcasted to all safekeepers, with the help of replication slot + asyncio.run(run_restarts_under_load(env, pg, env.safekeepers, period_time=15, iterations=5)) diff --git a/test_runner/fixtures/utils.py b/test_runner/fixtures/utils.py index f16fe1d9cf..98af511036 100644 --- a/test_runner/fixtures/utils.py +++ b/test_runner/fixtures/utils.py @@ -82,3 +82,14 @@ def print_gc_result(row): # path to etcd binary or None if not present. def etcd_path(): return shutil.which("etcd") + + +# Traverse directory to get total size. +def get_dir_size(path: str) -> int: + """Return size in bytes.""" + totalbytes = 0 + for root, dirs, files in os.walk(path): + for name in files: + totalbytes += os.path.getsize(os.path.join(root, name)) + + return totalbytes