Set wal_keep_size to zero (#1507)

wal_keep_size is already set to 0 in our cloud setup, but we don't use this value in tests. This commit fixes wal_keep_size in control_plane and adds tests for WAL recycling and lagging safekeepers.
This commit is contained in:
Arthur Petukhovsky
2022-04-27 19:09:28 +03:00
committed by GitHub
parent 695b5f9d88
commit 29539b0561
4 changed files with 95 additions and 15 deletions

View File

@@ -273,12 +273,7 @@ impl PostgresNode {
conf.append("wal_sender_timeout", "5s");
conf.append("listen_addresses", &self.address.ip().to_string());
conf.append("port", &self.address.port().to_string());
// Never clean up old WAL. TODO: We should use a replication
// slot or something proper, to prevent the compute node
// from removing WAL that hasn't been streamed to the safekeeper or
// page server yet. (gh issue #349)
conf.append("wal_keep_size", "10TB");
conf.append("wal_keep_size", "0");
// Configure the node to fetch pages from pageserver
let pageserver_connstr = {

View File

@@ -13,7 +13,7 @@ from dataclasses import dataclass, field
from multiprocessing import Process, Value
from pathlib import Path
from fixtures.zenith_fixtures import PgBin, Postgres, Safekeeper, ZenithEnv, ZenithEnvBuilder, PortDistributor, SafekeeperPort, zenith_binpath, PgProtocol
from fixtures.utils import etcd_path, lsn_to_hex, mkdir_if_needed, lsn_from_hex
from fixtures.utils import etcd_path, get_dir_size, lsn_to_hex, mkdir_if_needed, lsn_from_hex
from fixtures.log_helper import log
from typing import List, Optional, Any
@@ -791,3 +791,56 @@ def test_replace_safekeeper(zenith_env_builder: ZenithEnvBuilder):
env.safekeepers[1].stop(immediate=True)
execute_payload(pg)
show_statuses(env.safekeepers, tenant_id, timeline_id)
# We have `wal_keep_size=0`, so postgres should trim WAL once it's broadcasted
# to all safekeepers. This test checks that compute WAL can fit into small number
# of WAL segments.
def test_wal_deleted_after_broadcast(zenith_env_builder: ZenithEnvBuilder):
# used to calculate delta in collect_stats
last_lsn = .0
# returns LSN and pg_wal size, all in MB
def collect_stats(pg: Postgres, cur, enable_logs=True):
nonlocal last_lsn
assert pg.pgdata_dir is not None
log.info('executing INSERT to generate WAL')
cur.execute("select pg_current_wal_lsn()")
current_lsn = lsn_from_hex(cur.fetchone()[0]) / 1024 / 1024
pg_wal_size = get_dir_size(os.path.join(pg.pgdata_dir, 'pg_wal')) / 1024 / 1024
if enable_logs:
log.info(f"LSN delta: {current_lsn - last_lsn} MB, current WAL size: {pg_wal_size} MB")
last_lsn = current_lsn
return current_lsn, pg_wal_size
# generates about ~20MB of WAL, to create at least one new segment
def generate_wal(cur):
cur.execute("INSERT INTO t SELECT generate_series(1,300000), 'payload'")
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch('test_wal_deleted_after_broadcast')
# Adjust checkpoint config to prevent keeping old WAL segments
pg = env.postgres.create_start(
'test_wal_deleted_after_broadcast',
config_lines=['min_wal_size=32MB', 'max_wal_size=32MB', 'log_checkpoints=on'])
pg_conn = pg.connect()
cur = pg_conn.cursor()
cur.execute('CREATE TABLE t(key int, value text)')
collect_stats(pg, cur)
# generate WAL to simulate normal workload
for i in range(5):
generate_wal(cur)
collect_stats(pg, cur)
log.info('executing checkpoint')
cur.execute('CHECKPOINT')
wal_size_after_checkpoint = collect_stats(pg, cur)[1]
# there shouldn't be more than 2 WAL segments (but dir may have archive_status files)
assert wal_size_after_checkpoint < 16 * 2.5

View File

@@ -139,13 +139,12 @@ async def wait_for_lsn(safekeeper: Safekeeper,
async def run_restarts_under_load(env: ZenithEnv,
pg: Postgres,
acceptors: List[Safekeeper],
n_workers=10):
n_accounts = 100
init_amount = 100000
max_transfer = 100
period_time = 4
iterations = 10
n_workers=10,
n_accounts=100,
init_amount=100000,
max_transfer=100,
period_time=4,
iterations=10):
# Set timeout for this test at 5 minutes. It should be enough for test to complete
# and less than CircleCI's no_output_timeout, taking into account that this timeout
# is checked only at the beginning of every iteration.
@@ -202,7 +201,7 @@ async def run_restarts_under_load(env: ZenithEnv,
await pg_conn.close()
# restart acceptors one by one, while executing and validating bank transactions
# Restart acceptors one by one, while executing and validating bank transactions
def test_restarts_under_load(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init_start()
@@ -213,3 +212,25 @@ def test_restarts_under_load(zenith_env_builder: ZenithEnvBuilder):
config_lines=['max_replication_write_lag=1MB'])
asyncio.run(run_restarts_under_load(env, pg, env.safekeepers))
# Restart acceptors one by one and test that everything is working as expected
# when checkpoins are triggered frequently by max_wal_size=32MB. Because we have
# wal_keep_size=0, there will be aggressive WAL segments recycling.
def test_restarts_frequent_checkpoints(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch('test_restarts_frequent_checkpoints')
# Enable backpressure with 1MB maximal lag, because we don't want to block on `wait_for_lsn()` for too long
pg = env.postgres.create_start('test_restarts_frequent_checkpoints',
config_lines=[
'max_replication_write_lag=1MB',
'min_wal_size=32MB',
'max_wal_size=32MB',
'log_checkpoints=on'
])
# we try to simulate large (flush_lsn - truncate_lsn) lag, to test that WAL segments
# are not removed before broadcasted to all safekeepers, with the help of replication slot
asyncio.run(run_restarts_under_load(env, pg, env.safekeepers, period_time=15, iterations=5))

View File

@@ -82,3 +82,14 @@ def print_gc_result(row):
# path to etcd binary or None if not present.
def etcd_path():
return shutil.which("etcd")
# Traverse directory to get total size.
def get_dir_size(path: str) -> int:
"""Return size in bytes."""
totalbytes = 0
for root, dirs, files in os.walk(path):
for name in files:
totalbytes += os.path.getsize(os.path.join(root, name))
return totalbytes