Merge branch 'main' into basebackup-import

This commit is contained in:
Bojan Serafimov
2022-06-18 15:21:50 -04:00
6 changed files with 414 additions and 174 deletions

View File

@@ -2,6 +2,7 @@ import pytest
import random
import time
import os
import shutil
import signal
import subprocess
import sys
@@ -353,7 +354,7 @@ def test_broker(neon_env_builder: NeonEnvBuilder):
@pytest.mark.parametrize('auth_enabled', [False, True])
def test_wal_removal(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
neon_env_builder.num_safekeepers = 2
# to advance remote_consistent_llsn
# to advance remote_consistent_lsn
neon_env_builder.enable_local_fs_remote_storage()
neon_env_builder.auth_enabled = auth_enabled
env = neon_env_builder.init_start()
@@ -437,6 +438,26 @@ def wait_segment_offload(tenant_id, timeline_id, live_sk, seg_end):
time.sleep(0.5)
def wait_wal_trim(tenant_id, timeline_id, sk, target_size):
started_at = time.time()
http_cli = sk.http_client()
while True:
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
sk_wal_size = get_dir_size(os.path.join(sk.data_dir(), tenant_id,
timeline_id)) / 1024 / 1024
log.info(f"Safekeeper id={sk.id} wal_size={sk_wal_size:.2f}MB status={tli_status}")
if sk_wal_size <= target_size:
break
elapsed = time.time() - started_at
if elapsed > 20:
raise RuntimeError(
f"timed out waiting {elapsed:.0f}s for sk_id={sk.id} to trim WAL to {target_size:.2f}MB, current size is {sk_wal_size:.2f}MB"
)
time.sleep(0.5)
@pytest.mark.parametrize('storage_type', ['mock_s3', 'local_fs'])
def test_wal_backup(neon_env_builder: NeonEnvBuilder, storage_type: str):
neon_env_builder.num_safekeepers = 3
@@ -485,6 +506,116 @@ def test_wal_backup(neon_env_builder: NeonEnvBuilder, storage_type: str):
wait_segment_offload(tenant_id, timeline_id, env.safekeepers[1], '0/5000000')
@pytest.mark.parametrize('storage_type', ['mock_s3', 'local_fs'])
def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, storage_type: str):
neon_env_builder.num_safekeepers = 3
if storage_type == 'local_fs':
neon_env_builder.enable_local_fs_remote_storage()
elif storage_type == 'mock_s3':
neon_env_builder.enable_s3_mock_remote_storage('test_s3_wal_replay')
else:
raise RuntimeError(f'Unknown storage type: {storage_type}')
neon_env_builder.remote_storage_users = RemoteStorageUsers.SAFEKEEPER
env = neon_env_builder.init_start()
env.neon_cli.create_branch('test_s3_wal_replay')
env.pageserver.stop()
pageserver_tenants_dir = os.path.join(env.repo_dir, 'tenants')
pageserver_fresh_copy = os.path.join(env.repo_dir, 'tenants_fresh')
log.info(f"Creating a copy of pageserver in a fresh state at {pageserver_fresh_copy}")
shutil.copytree(pageserver_tenants_dir, pageserver_fresh_copy)
env.pageserver.start()
pg = env.postgres.create_start('test_s3_wal_replay')
# learn neon timeline from compute
tenant_id = pg.safe_psql("show neon.tenant_id")[0][0]
timeline_id = pg.safe_psql("show neon.timeline_id")[0][0]
expected_sum = 0
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute("create table t(key int, value text)")
cur.execute("insert into t values (1, 'payload')")
expected_sum += 1
offloaded_seg_end = ['0/3000000']
for seg_end in offloaded_seg_end:
# roughly fills two segments
cur.execute("insert into t select generate_series(1,500000), 'payload'")
expected_sum += 500000 * 500001 // 2
cur.execute("select sum(key) from t")
assert cur.fetchone()[0] == expected_sum
for sk in env.safekeepers:
wait_segment_offload(tenant_id, timeline_id, sk, seg_end)
# advance remote_consistent_lsn to trigger WAL trimming
# this LSN should be less than commit_lsn, so timeline will be active=true in safekeepers, to push etcd updates
env.safekeepers[0].http_client().record_safekeeper_info(
tenant_id, timeline_id, {'remote_consistent_lsn': offloaded_seg_end[-1]})
for sk in env.safekeepers:
# require WAL to be trimmed, so no more than one segment is left on disk
wait_wal_trim(tenant_id, timeline_id, sk, 16 * 1.5)
cur.execute('SELECT pg_current_wal_flush_lsn()')
last_lsn = cur.fetchone()[0]
pageserver_lsn = env.pageserver.http_client().timeline_detail(
uuid.UUID(tenant_id), uuid.UUID((timeline_id)))["local"]["last_record_lsn"]
lag = lsn_from_hex(last_lsn) - lsn_from_hex(pageserver_lsn)
log.info(
f'Pageserver last_record_lsn={pageserver_lsn}; flush_lsn={last_lsn}; lag before replay is {lag / 1024}kb'
)
# replace pageserver with a fresh copy
pg.stop_and_destroy()
env.pageserver.stop()
log.info(f'Removing current pageserver state at {pageserver_tenants_dir}')
shutil.rmtree(pageserver_tenants_dir)
log.info(f'Copying fresh pageserver state from {pageserver_fresh_copy}')
shutil.move(pageserver_fresh_copy, pageserver_tenants_dir)
# start pageserver and wait for replay
env.pageserver.start()
wait_lsn_timeout = 60 * 3
started_at = time.time()
last_debug_print = 0.0
while True:
elapsed = time.time() - started_at
if elapsed > wait_lsn_timeout:
raise RuntimeError(f'Timed out waiting for WAL redo')
pageserver_lsn = env.pageserver.http_client().timeline_detail(
uuid.UUID(tenant_id), uuid.UUID((timeline_id)))["local"]["last_record_lsn"]
lag = lsn_from_hex(last_lsn) - lsn_from_hex(pageserver_lsn)
if time.time() > last_debug_print + 10 or lag <= 0:
last_debug_print = time.time()
log.info(f'Pageserver last_record_lsn={pageserver_lsn}; lag is {lag / 1024}kb')
if lag <= 0:
break
time.sleep(1)
log.info(f'WAL redo took {elapsed} s')
# verify data
pg.create_start('test_s3_wal_replay')
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute("select sum(key) from t")
assert cur.fetchone()[0] == expected_sum
class ProposerPostgres(PgProtocol):
"""Object for running postgres without NeonEnv"""
def __init__(self,