mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 18:02:56 +00:00
Part of #7497, closes https://github.com/neondatabase/neon/issues/8890. ## Problem Since leases are in-memory objects, we need to take special care of them after pageserver restarts and while doing a live migration. The approach we took for pageserver restart is to wait for at least lease duration before doing first GC. We want to do the same for live migration. Since we do not do any GC when a tenant is in `AttachedStale` or `AttachedMulti` mode, only the transition from `AttachedMulti` to `AttachedSingle` requires this treatment. ## Summary of changes - Added `lsn_lease_deadline` field in `GcBlock::reasons`: the tenant is temporarily blocked from GC until we reach the deadline. This information does not persist to S3. - In `GCBlock::start`, skip the GC iteration if we are blocked by the lsn lease deadline. - In `TenantManager::upsert_location`, set the lsn_lease_deadline to `Instant::now() + lsn_lease_length` so the granted leases have a chance to be renewed before we run GC for the first time after transitioned from AttachedMulti to AttachedSingle. Signed-off-by: Yuchen Liang <yuchen@neon.tech> Co-authored-by: Joonas Koivunen <joonas@neon.tech>
129 lines
3.8 KiB
Python
129 lines
3.8 KiB
Python
import time
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from dataclasses import dataclass
|
|
from typing import List, Optional
|
|
|
|
import pytest
|
|
from fixtures.log_helper import log
|
|
from fixtures.neon_fixtures import (
|
|
LogCursor,
|
|
NeonEnvBuilder,
|
|
NeonPageserver,
|
|
)
|
|
from fixtures.pageserver.utils import wait_timeline_detail_404
|
|
|
|
|
|
@pytest.mark.parametrize("sharded", [True, False])
|
|
def test_gc_blocking_by_timeline(neon_env_builder: NeonEnvBuilder, sharded: bool):
|
|
neon_env_builder.num_pageservers = 2 if sharded else 1
|
|
env = neon_env_builder.init_start(
|
|
initial_tenant_conf={"gc_period": "1s", "lsn_lease_length": "0s"},
|
|
initial_tenant_shard_count=2 if sharded else None,
|
|
)
|
|
|
|
if sharded:
|
|
http = env.storage_controller.pageserver_api()
|
|
else:
|
|
http = env.pageserver.http_client()
|
|
|
|
pss = ManyPageservers(list(map(lambda ps: ScrollableLog(ps, None), env.pageservers)))
|
|
|
|
foo_branch = env.neon_cli.create_branch("foo", "main", env.initial_tenant)
|
|
|
|
gc_active_line = ".* gc_loop.*: [12] timelines need GC"
|
|
gc_skipped_line = ".* gc_loop.*: Skipping GC: .*"
|
|
init_gc_skipped = ".*: initialized with gc blocked.*"
|
|
|
|
tenant_before = http.tenant_status(env.initial_tenant)
|
|
|
|
wait_for_another_gc_round()
|
|
pss.assert_log_contains(gc_active_line)
|
|
pss.assert_log_does_not_contain(gc_skipped_line)
|
|
|
|
http.timeline_block_gc(env.initial_tenant, foo_branch)
|
|
|
|
tenant_after = http.tenant_status(env.initial_tenant)
|
|
assert tenant_before != tenant_after
|
|
gc_blocking = tenant_after["gc_blocking"]
|
|
assert (
|
|
gc_blocking
|
|
== "BlockingReasons { tenant_blocked_by_lsn_lease_deadline: false, timelines: 1, reasons: EnumSet(Manual) }"
|
|
)
|
|
|
|
wait_for_another_gc_round()
|
|
pss.assert_log_contains(gc_skipped_line)
|
|
|
|
pss.restart()
|
|
pss.quiesce_tenants()
|
|
|
|
pss.assert_log_contains(init_gc_skipped)
|
|
|
|
wait_for_another_gc_round()
|
|
pss.assert_log_contains(gc_skipped_line)
|
|
|
|
# deletion unblocks gc
|
|
http.timeline_delete(env.initial_tenant, foo_branch)
|
|
wait_timeline_detail_404(http, env.initial_tenant, foo_branch, 10, 1.0)
|
|
|
|
wait_for_another_gc_round()
|
|
pss.assert_log_contains(gc_active_line)
|
|
|
|
http.timeline_block_gc(env.initial_tenant, env.initial_timeline)
|
|
|
|
wait_for_another_gc_round()
|
|
pss.assert_log_contains(gc_skipped_line)
|
|
|
|
# removing the manual block also unblocks gc
|
|
http.timeline_unblock_gc(env.initial_tenant, env.initial_timeline)
|
|
|
|
wait_for_another_gc_round()
|
|
pss.assert_log_contains(gc_active_line)
|
|
|
|
|
|
def wait_for_another_gc_round():
|
|
time.sleep(2)
|
|
|
|
|
|
@dataclass
|
|
class ScrollableLog:
|
|
pageserver: NeonPageserver
|
|
offset: Optional[LogCursor]
|
|
|
|
def assert_log_contains(self, what: str):
|
|
msg, offset = self.pageserver.assert_log_contains(what, offset=self.offset)
|
|
old = self.offset
|
|
self.offset = offset
|
|
log.info(f"{old} -> {offset}: {msg}")
|
|
|
|
def assert_log_does_not_contain(self, what: str):
|
|
assert self.pageserver.log_contains(what) is None
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ManyPageservers:
|
|
many: List[ScrollableLog]
|
|
|
|
def assert_log_contains(self, what: str):
|
|
for one in self.many:
|
|
one.assert_log_contains(what)
|
|
|
|
def assert_log_does_not_contain(self, what: str):
|
|
for one in self.many:
|
|
one.assert_log_does_not_contain(what)
|
|
|
|
def restart(self):
|
|
def do_restart(x: ScrollableLog):
|
|
x.pageserver.restart()
|
|
|
|
with ThreadPoolExecutor(max_workers=len(self.many)) as rt:
|
|
rt.map(do_restart, self.many)
|
|
rt.shutdown(wait=True)
|
|
|
|
def quiesce_tenants(self):
|
|
def do_quiesce(x: ScrollableLog):
|
|
x.pageserver.quiesce_tenants()
|
|
|
|
with ThreadPoolExecutor(max_workers=len(self.many)) as rt:
|
|
rt.map(do_quiesce, self.many)
|
|
rt.shutdown(wait=True)
|