Files
neon/test_runner/regress/test_timeline_gc_blocking.py
Joonas Koivunen 3dbd34aa78 feat(storcon): forward gc blocking and unblocking (#8956)
Currently using gc blocking and unblocking with storage controller
managed pageservers is painful. Implement the API on storage controller.

Fixes: #8893
2024-09-06 22:42:55 +01:00

126 lines
3.7 KiB
Python

import time
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from typing import List, Optional
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
LogCursor,
NeonEnvBuilder,
NeonPageserver,
)
from fixtures.pageserver.utils import wait_timeline_detail_404
@pytest.mark.parametrize("sharded", [True, False])
def test_gc_blocking_by_timeline(neon_env_builder: NeonEnvBuilder, sharded: bool):
neon_env_builder.num_pageservers = 2 if sharded else 1
env = neon_env_builder.init_start(
initial_tenant_conf={"gc_period": "1s", "lsn_lease_length": "0s"},
initial_tenant_shard_count=2 if sharded else None,
)
if sharded:
http = env.storage_controller.pageserver_api()
else:
http = env.pageserver.http_client()
pss = ManyPageservers(list(map(lambda ps: ScrollableLog(ps, None), env.pageservers)))
foo_branch = env.neon_cli.create_branch("foo", "main", env.initial_tenant)
gc_active_line = ".* gc_loop.*: [12] timelines need GC"
gc_skipped_line = ".* gc_loop.*: Skipping GC: .*"
init_gc_skipped = ".*: initialized with gc blocked.*"
tenant_before = http.tenant_status(env.initial_tenant)
wait_for_another_gc_round()
pss.assert_log_contains(gc_active_line)
pss.assert_log_does_not_contain(gc_skipped_line)
http.timeline_block_gc(env.initial_tenant, foo_branch)
tenant_after = http.tenant_status(env.initial_tenant)
assert tenant_before != tenant_after
gc_blocking = tenant_after["gc_blocking"]
assert gc_blocking == "BlockingReasons { timelines: 1, reasons: EnumSet(Manual) }"
wait_for_another_gc_round()
pss.assert_log_contains(gc_skipped_line)
pss.restart()
pss.quiesce_tenants()
pss.assert_log_contains(init_gc_skipped)
wait_for_another_gc_round()
pss.assert_log_contains(gc_skipped_line)
# deletion unblocks gc
http.timeline_delete(env.initial_tenant, foo_branch)
wait_timeline_detail_404(http, env.initial_tenant, foo_branch, 10, 1.0)
wait_for_another_gc_round()
pss.assert_log_contains(gc_active_line)
http.timeline_block_gc(env.initial_tenant, env.initial_timeline)
wait_for_another_gc_round()
pss.assert_log_contains(gc_skipped_line)
# removing the manual block also unblocks gc
http.timeline_unblock_gc(env.initial_tenant, env.initial_timeline)
wait_for_another_gc_round()
pss.assert_log_contains(gc_active_line)
def wait_for_another_gc_round():
time.sleep(2)
@dataclass
class ScrollableLog:
pageserver: NeonPageserver
offset: Optional[LogCursor]
def assert_log_contains(self, what: str):
msg, offset = self.pageserver.assert_log_contains(what, offset=self.offset)
old = self.offset
self.offset = offset
log.info(f"{old} -> {offset}: {msg}")
def assert_log_does_not_contain(self, what: str):
assert self.pageserver.log_contains(what) is None
@dataclass(frozen=True)
class ManyPageservers:
many: List[ScrollableLog]
def assert_log_contains(self, what: str):
for one in self.many:
one.assert_log_contains(what)
def assert_log_does_not_contain(self, what: str):
for one in self.many:
one.assert_log_does_not_contain(what)
def restart(self):
def do_restart(x: ScrollableLog):
x.pageserver.restart()
with ThreadPoolExecutor(max_workers=len(self.many)) as rt:
rt.map(do_restart, self.many)
rt.shutdown(wait=True)
def quiesce_tenants(self):
def do_quiesce(x: ScrollableLog):
x.pageserver.quiesce_tenants()
with ThreadPoolExecutor(max_workers=len(self.many)) as rt:
rt.map(do_quiesce, self.many)
rt.shutdown(wait=True)