Files
neon/test_runner/regress/test_timeline_detach_ancestor.py
Matthias van de Meent 4566b12a22 NEON: Finish Zenith->Neon rename (#12566)
Even though we're now part of Databricks, let's at least make this part
consistent.

## Summary of changes

- PG14: https://github.com/neondatabase/postgres/pull/669
- PG15: https://github.com/neondatabase/postgres/pull/670
- PG16: https://github.com/neondatabase/postgres/pull/671
- PG17: https://github.com/neondatabase/postgres/pull/672

---------

Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
2025-07-11 18:56:39 +00:00

1924 lines
75 KiB
Python

from __future__ import annotations
import datetime
import enum
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from enum import StrEnum
from queue import Empty, Queue
from threading import Barrier
import pytest
import requests
from fixtures.common_types import Lsn, TimelineArchivalState, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
LogCursor,
NeonEnvBuilder,
PgBin,
flush_ep_to_pageserver,
last_flush_lsn_upload,
wait_for_last_flush_lsn,
)
from fixtures.pageserver.http import (
HistoricLayerInfo,
PageserverApiException,
)
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_timeline_detail_404
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
from fixtures.utils import assert_pageserver_backups_equal, skip_in_debug_build, wait_until
from fixtures.workload import Workload
from requests import ReadTimeout
def by_end_lsn(info: HistoricLayerInfo) -> Lsn:
assert info.lsn_end is not None
return Lsn(info.lsn_end)
def layer_name(info: HistoricLayerInfo) -> str:
return info.layer_file_name
@enum.unique
class Branchpoint(StrEnum):
"""
Have branches at these Lsns possibly relative to L0 layer boundary.
"""
EARLIER = "earlier"
AT_L0 = "at"
AFTER_L0 = "after"
LAST_RECORD_LSN = "head"
def __str__(self) -> str:
return self.value
@staticmethod
def all() -> list[Branchpoint]:
return [
Branchpoint.EARLIER,
Branchpoint.AT_L0,
Branchpoint.AFTER_L0,
Branchpoint.LAST_RECORD_LSN,
]
SHUTDOWN_ALLOWED_ERRORS = [
".*initial size calculation failed: downloading failed, possibly for shutdown",
".*failed to freeze and flush: cannot flush frozen layers when flush_loop is not running, state is Exited",
".*logical_size_calculation_task:panic.*: Sequential get failed with Bad state \\(not active\\).*",
".*Task 'initial size calculation' .* panicked.*",
]
@pytest.mark.parametrize("branchpoint", Branchpoint.all())
@pytest.mark.parametrize("restart_after", [True, False])
@pytest.mark.parametrize("write_to_branch_first", [True, False])
def test_ancestor_detach_branched_from(
test_output_dir,
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
branchpoint: Branchpoint,
restart_after: bool,
write_to_branch_first: bool,
):
"""
Creates a branch relative to L0 lsn boundary according to Branchpoint. Later the timeline is detached.
"""
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
client = env.pageserver.http_client()
with env.endpoints.create_start("main", tenant_id=env.initial_tenant) as ep:
ep.safe_psql("CREATE TABLE foo (i BIGINT);")
after_first_tx = wait_for_last_flush_lsn(env, ep, env.initial_tenant, env.initial_timeline)
ep.safe_psql("INSERT INTO foo SELECT i::bigint FROM generate_series(0, 8191) g(i);")
# create a single layer for us to remote copy
wait_for_last_flush_lsn(env, ep, env.initial_tenant, env.initial_timeline)
client.timeline_checkpoint(env.initial_tenant, env.initial_timeline)
ep.safe_psql("INSERT INTO foo SELECT i::bigint FROM generate_series(8192, 16383) g(i);")
flush_ep_to_pageserver(env, ep, env.initial_tenant, env.initial_timeline)
deltas = client.layer_map_info(env.initial_tenant, env.initial_timeline).delta_layers()
# there is also the in-mem layer, but ignore it for now
assert len(deltas) == 2, "expecting there to be two deltas: initdb and checkpointed"
later_delta = max(deltas, key=by_end_lsn)
assert later_delta.lsn_end is not None
# -1 as the lsn_end is exclusive.
last_lsn = Lsn(later_delta.lsn_end).lsn_int - 1
if branchpoint == Branchpoint.EARLIER:
branch_at = after_first_tx
rows = 0
truncated_layers = 1
elif branchpoint == Branchpoint.AT_L0:
branch_at = Lsn(last_lsn)
rows = 8192
truncated_layers = 0
elif branchpoint == Branchpoint.AFTER_L0:
branch_at = Lsn(last_lsn + 8)
# make sure the branch point is not on a page header
if 0 < (branch_at.lsn_int % 8192) < 40:
branch_at += 40
rows = 8192
# as there is no 8 byte walrecord, nothing should get copied from the straddling layer
truncated_layers = 0
else:
# this case also covers the implicit flush of ancestor as the inmemory hasn't been flushed yet
assert branchpoint == Branchpoint.LAST_RECORD_LSN
branch_at = None
rows = 16384
truncated_layers = 0
name = "new main"
timeline_id = env.create_branch(name, ancestor_branch_name="main", ancestor_start_lsn=branch_at)
recorded = Lsn(client.timeline_detail(env.initial_tenant, timeline_id)["ancestor_lsn"])
if branch_at is None:
# fix it up if we need it later (currently unused)
branch_at = recorded
else:
assert branch_at == recorded, "the test should not use unaligned lsns"
if write_to_branch_first:
with env.endpoints.create_start(name, tenant_id=env.initial_tenant) as ep:
assert ep.safe_psql("SELECT count(*) FROM foo;")[0][0] == rows
# make sure the ep is writable
# with BEFORE_L0, AFTER_L0 there will be a gap in Lsns caused by accurate end_lsn on straddling layers
ep.safe_psql("CREATE TABLE audit AS SELECT 1 as starts;")
wait_for_last_flush_lsn(env, ep, env.initial_tenant, timeline_id)
# branch must have a flush for "PREV_LSN: none"
client.timeline_checkpoint(env.initial_tenant, timeline_id)
branch_layers = set(
map(layer_name, client.layer_map_info(env.initial_tenant, timeline_id).historic_layers)
)
else:
branch_layers = set()
# run fullbackup to make sure there are no off by one errors
# take this on the parent
fullbackup_before = test_output_dir / "fullbackup-before.tar"
pg_bin.take_fullbackup(
env.pageserver, env.initial_tenant, env.initial_timeline, branch_at, fullbackup_before
)
all_reparented = client.detach_ancestor(env.initial_tenant, timeline_id)
assert all_reparented == set()
if restart_after:
env.pageserver.stop()
env.pageserver.start()
with env.endpoints.create_start("main", tenant_id=env.initial_tenant) as ep:
assert ep.safe_psql("SELECT count(*) FROM foo;")[0][0] == 16384
with env.endpoints.create_start(name, tenant_id=env.initial_tenant) as ep:
assert ep.safe_psql("SELECT count(*) FROM foo;")[0][0] == rows
old_main_info = client.layer_map_info(env.initial_tenant, env.initial_timeline)
old_main = set(map(layer_name, old_main_info.historic_layers))
new_main_info = client.layer_map_info(env.initial_tenant, timeline_id)
new_main = set(map(layer_name, new_main_info.historic_layers))
new_main_copied_or_truncated = new_main - branch_layers
new_main_truncated = new_main_copied_or_truncated - old_main
assert len(new_main_truncated) == truncated_layers
# could additionally check that the symmetric difference has layers starting at the same lsn
# but if nothing was copied, then there is no nice rule.
# there could be a hole in LSNs between copied from the "old main" and the first branch layer.
# take this on the detached, at same lsn
fullbackup_after = test_output_dir / "fullbackup-after.tar"
pg_bin.take_fullbackup(
env.pageserver, env.initial_tenant, timeline_id, branch_at, fullbackup_after
)
client.timeline_delete(env.initial_tenant, env.initial_timeline)
wait_timeline_detail_404(client, env.initial_tenant, env.initial_timeline)
# because we do the fullbackup from ancestor at the branch_lsn, the neon.signal and/or zenith.signal is always
# different as there is always "PREV_LSN: invalid" for "before"
skip_files = {"zenith.signal", "neon.signal"}
assert_pageserver_backups_equal(fullbackup_before, fullbackup_after, skip_files)
def test_ancestor_detach_reparents_earlier(neon_env_builder: NeonEnvBuilder):
"""
The case from RFC:
+-> another branch with same ancestor_lsn as new main
|
old main -------|---------X--------->
| | |
| | +-> after
| |
| +-> new main
|
+-> reparented
Ends up as:
old main --------------------------->
|
+-> after
+-> another branch with same ancestor_lsn as new main
|
new main -------|---------|->
|
+-> reparented
We confirm the end result by being able to delete "old main" after deleting "after".
"""
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
client = env.pageserver.http_client()
with env.endpoints.create_start("main", tenant_id=env.initial_tenant) as ep:
ep.safe_psql("CREATE TABLE foo (i BIGINT);")
ep.safe_psql("CREATE TABLE audit AS SELECT 1 as starts;")
branchpoint_pipe = wait_for_last_flush_lsn(
env, ep, env.initial_tenant, env.initial_timeline
)
ep.safe_psql("INSERT INTO foo SELECT i::bigint FROM generate_series(0, 8191) g(i);")
branchpoint_x = wait_for_last_flush_lsn(env, ep, env.initial_tenant, env.initial_timeline)
client.timeline_checkpoint(env.initial_tenant, env.initial_timeline)
ep.safe_psql("INSERT INTO foo SELECT i::bigint FROM generate_series(8192, 16383) g(i);")
wait_for_last_flush_lsn(env, ep, env.initial_tenant, env.initial_timeline)
# as this only gets reparented, we don't need to write to it like new main
reparented = env.create_branch(
"reparented", ancestor_branch_name="main", ancestor_start_lsn=branchpoint_pipe
)
same_branchpoint = env.create_branch(
"same_branchpoint", ancestor_branch_name="main", ancestor_start_lsn=branchpoint_x
)
timeline_id = env.create_branch(
"new main", ancestor_branch_name="main", ancestor_start_lsn=branchpoint_x
)
after = env.create_branch("after", ancestor_branch_name="main", ancestor_start_lsn=None)
all_reparented = client.detach_ancestor(env.initial_tenant, timeline_id)
assert set(all_reparented) == {reparented, same_branchpoint}
env.pageserver.quiesce_tenants()
# checking the ancestor after is much faster than waiting for the endpoint not start
expected_result = [
("main", env.initial_timeline, None, 16384, 1),
("after", after, env.initial_timeline, 16384, 1),
("new main", timeline_id, None, 8192, 1),
("same_branchpoint", same_branchpoint, timeline_id, 8192, 1),
("reparented", reparented, timeline_id, 0, 1),
]
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
for _, queried_timeline, expected_ancestor, _, _ in expected_result:
details = client.timeline_detail(env.initial_tenant, queried_timeline)
ancestor_timeline_id = details["ancestor_timeline_id"]
if expected_ancestor is None:
assert ancestor_timeline_id is None
else:
assert TimelineId(ancestor_timeline_id) == expected_ancestor
index_part = env.pageserver_remote_storage.index_content(
env.initial_tenant, queried_timeline
)
lineage = index_part["lineage"]
assert lineage is not None
assert lineage.get("reparenting_history_overflown", "false") == "false"
if queried_timeline == timeline_id:
original_ancestor = lineage["original_ancestor"]
assert original_ancestor is not None
assert original_ancestor[0] == str(env.initial_timeline)
assert original_ancestor[1] == str(branchpoint_x)
# this does not contain Z in the end, so fromisoformat accepts it
# it is to be in line with the deletion timestamp.. well, almost.
when = original_ancestor[2][:26]
when_ts = datetime.datetime.fromisoformat(when).replace(tzinfo=datetime.UTC)
now = datetime.datetime.now(datetime.UTC)
assert when_ts < now
assert len(lineage.get("reparenting_history", [])) == 0
elif expected_ancestor == timeline_id:
assert len(lineage.get("original_ancestor", [])) == 0
assert lineage["reparenting_history"] == [str(env.initial_timeline)]
else:
assert len(lineage.get("original_ancestor", [])) == 0
assert len(lineage.get("reparenting_history", [])) == 0
for name, _, _, rows, starts in expected_result:
with env.endpoints.create_start(name, tenant_id=env.initial_tenant) as ep:
assert ep.safe_psql("SELECT count(*) FROM foo;")[0][0] == rows
assert ep.safe_psql(f"SELECT count(*) FROM audit WHERE starts = {starts}")[0][0] == 1
# delete the timelines to confirm detach actually worked
client.timeline_delete(env.initial_tenant, after)
wait_timeline_detail_404(client, env.initial_tenant, after)
client.timeline_delete(env.initial_tenant, env.initial_timeline)
wait_timeline_detail_404(client, env.initial_tenant, env.initial_timeline)
@pytest.mark.parametrize("snapshots_archived", ["archived", "normal"])
def test_ancestor_detach_behavior_v2(neon_env_builder: NeonEnvBuilder, snapshots_archived: str):
"""
Test the v2 behavior of ancestor detach.
old main -------|---------X--------->
| | |
| | +-> after
| +--X empty snapshot branch
| |
| +-> branch-to-detach
|
+-> earlier
Ends up as:
old main -------|---------X--------->
| | |
| | +-> after
| +--X empty snapshot branch
|
+-> earlier
new main -------|---------|----> branch-to-detach
"""
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
client = env.pageserver.http_client()
with env.endpoints.create_start("main", tenant_id=env.initial_tenant) as ep:
ep.safe_psql("CREATE TABLE foo (i BIGINT);")
ep.safe_psql("CREATE TABLE audit AS SELECT 1 as starts;")
branchpoint_pipe = wait_for_last_flush_lsn(
env, ep, env.initial_tenant, env.initial_timeline
)
ep.safe_psql("INSERT INTO foo SELECT i::bigint FROM generate_series(0, 8191) g(i);")
branchpoint_y = wait_for_last_flush_lsn(env, ep, env.initial_tenant, env.initial_timeline)
client.timeline_checkpoint(env.initial_tenant, env.initial_timeline)
ep.safe_psql("INSERT INTO foo SELECT i::bigint FROM generate_series(0, 8191) g(i);")
branchpoint_x = wait_for_last_flush_lsn(env, ep, env.initial_tenant, env.initial_timeline)
client.timeline_checkpoint(env.initial_tenant, env.initial_timeline)
ep.safe_psql("INSERT INTO foo SELECT i::bigint FROM generate_series(8192, 16383) g(i);")
wait_for_last_flush_lsn(env, ep, env.initial_tenant, env.initial_timeline)
earlier = env.create_branch(
"earlier", ancestor_branch_name="main", ancestor_start_lsn=branchpoint_pipe
)
snapshot_branchpoint_old = TimelineId.generate()
env.storage_controller.timeline_create(
env.initial_tenant,
{
"new_timeline_id": str(snapshot_branchpoint_old),
"ancestor_start_lsn": str(branchpoint_y),
"ancestor_timeline_id": str(env.initial_timeline),
"read_only": True,
},
)
sk = env.safekeepers[0]
assert sk
with pytest.raises(requests.exceptions.HTTPError, match="Not Found"):
sk.http_client().timeline_status(
tenant_id=env.initial_tenant, timeline_id=snapshot_branchpoint_old
)
env.neon_cli.mappings_map_branch(
"snapshot_branchpoint_old", env.initial_tenant, snapshot_branchpoint_old
)
snapshot_branchpoint = env.create_branch(
"snapshot_branchpoint", ancestor_branch_name="main", ancestor_start_lsn=branchpoint_x
)
branch_to_detach = env.create_branch(
"branch_to_detach",
ancestor_branch_name="snapshot_branchpoint",
ancestor_start_lsn=branchpoint_x,
)
after = env.create_branch("after", ancestor_branch_name="main", ancestor_start_lsn=None)
if snapshots_archived == "archived":
# archive the previous snapshot branchpoint
client.timeline_archival_config(
env.initial_tenant, snapshot_branchpoint_old, TimelineArchivalState.ARCHIVED
)
all_reparented = client.detach_ancestor(
env.initial_tenant, branch_to_detach, detach_behavior="v2"
)
assert set(all_reparented) == set()
if snapshots_archived == "archived":
# restore the branchpoint so that we can query from the endpoint
client.timeline_archival_config(
env.initial_tenant, snapshot_branchpoint_old, TimelineArchivalState.UNARCHIVED
)
env.pageserver.quiesce_tenants()
# checking the ancestor after is much faster than waiting for the endpoint to start
expected_result = [
# (branch_name, queried_timeline, expected_ancestor, rows, starts, read_only)
("main", env.initial_timeline, None, 24576, 1, False),
("after", after, env.initial_timeline, 24576, 1, False),
("snapshot_branchpoint_old", snapshot_branchpoint_old, env.initial_timeline, 8192, 1, True),
("snapshot_branchpoint", snapshot_branchpoint, env.initial_timeline, 16384, 1, False),
("branch_to_detach", branch_to_detach, None, 16384, 1, False),
("earlier", earlier, env.initial_timeline, 0, 1, False),
]
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
for branch_name, queried_timeline, expected_ancestor, _, _, _ in expected_result:
details = client.timeline_detail(env.initial_tenant, queried_timeline)
ancestor_timeline_id = details["ancestor_timeline_id"]
if expected_ancestor is None:
assert ancestor_timeline_id is None
else:
assert TimelineId(ancestor_timeline_id) == expected_ancestor, (
f"when checking branch {branch_name}, mapping={expected_result}"
)
index_part = env.pageserver_remote_storage.index_content(
env.initial_tenant, queried_timeline
)
lineage = index_part["lineage"]
assert lineage is not None
assert lineage.get("reparenting_history_overflown", "false") == "false"
if queried_timeline == branch_to_detach:
original_ancestor = lineage["original_ancestor"]
assert original_ancestor is not None
assert original_ancestor[0] == str(env.initial_timeline)
assert original_ancestor[1] == str(branchpoint_x)
# this does not contain Z in the end, so fromisoformat accepts it
# it is to be in line with the deletion timestamp.. well, almost.
when = original_ancestor[2][:26]
when_ts = datetime.datetime.fromisoformat(when).replace(tzinfo=datetime.UTC)
now = datetime.datetime.utcnow().replace(tzinfo=datetime.UTC)
assert when_ts < now
assert len(lineage.get("reparenting_history", [])) == 0
elif expected_ancestor == branch_to_detach:
assert len(lineage.get("original_ancestor", [])) == 0
assert lineage["reparenting_history"] == [str(env.initial_timeline)]
else:
assert len(lineage.get("original_ancestor", [])) == 0
assert len(lineage.get("reparenting_history", [])) == 0
for branch_name, queried_timeline, _, rows, starts, read_only in expected_result:
last_record_lsn = None
if read_only:
# specifying the lsn makes the endpoint read-only and not connect to safekeepers
details = client.timeline_detail(env.initial_tenant, queried_timeline)
last_record_lsn = Lsn(details["last_record_lsn"])
log.info(f"reading data from branch {branch_name} at {last_record_lsn}")
with env.endpoints.create(
branch_name,
lsn=last_record_lsn,
) as ep:
ep.start(safekeeper_generation=1)
assert ep.safe_psql("SELECT count(*) FROM foo;")[0][0] == rows
assert ep.safe_psql(f"SELECT count(*) FROM audit WHERE starts = {starts}")[0][0] == 1
# delete the new timeline to confirm it doesn't carry over the anything from the old timeline
client.timeline_delete(env.initial_tenant, branch_to_detach)
wait_timeline_detail_404(client, env.initial_tenant, branch_to_detach)
# delete the after timeline
client.timeline_delete(env.initial_tenant, after)
wait_timeline_detail_404(client, env.initial_tenant, after)
def test_detached_receives_flushes_while_being_detached(neon_env_builder: NeonEnvBuilder):
"""
Makes sure that the timeline is able to receive writes through-out the detach process.
"""
env = neon_env_builder.init_start()
client = env.pageserver.http_client()
# row counts have been manually verified to cause reconnections and getpage
# requests when restart_after=False with pg16
def insert_rows(n: int, ep) -> int:
ep.safe_psql(
f"INSERT INTO foo SELECT i::bigint, 'more info!! this is a long string' || i FROM generate_series(0, {n - 1}) g(i);"
)
return n
with env.endpoints.create_start("main", tenant_id=env.initial_tenant) as ep:
ep.safe_psql("CREATE EXTENSION neon_test_utils;")
ep.safe_psql("CREATE TABLE foo (i BIGINT, aux TEXT NOT NULL);")
rows = insert_rows(256, ep)
branchpoint = wait_for_last_flush_lsn(env, ep, env.initial_tenant, env.initial_timeline)
timeline_id = env.create_branch(
"new main", ancestor_branch_name="main", ancestor_start_lsn=branchpoint
)
log.info("starting the new main endpoint")
ep = env.endpoints.create_start("new main", tenant_id=env.initial_tenant)
assert ep.safe_psql("SELECT count(*) FROM foo;")[0][0] == rows
def small_txs(ep, queue: Queue[str], barrier):
extra_rows = 0
with ep.connect() as conn:
while True:
try:
queue.get_nowait()
break
except Empty:
pass
if barrier is not None:
barrier.wait()
barrier = None
cursor = conn.cursor()
cursor.execute(
"INSERT INTO foo(i, aux) VALUES (1, 'more info!! this is a long string' || 1);"
)
extra_rows += 1
return extra_rows
with ThreadPoolExecutor(max_workers=1) as exec:
queue: Queue[str] = Queue()
barrier = Barrier(2)
completion = exec.submit(small_txs, ep, queue, barrier)
barrier.wait()
reparented = client.detach_ancestor(env.initial_tenant, timeline_id)
assert len(reparented) == 0
env.pageserver.quiesce_tenants()
queue.put("done")
extra_rows = completion.result()
assert extra_rows > 0, "some rows should had been written"
rows += extra_rows
assert client.timeline_detail(env.initial_tenant, timeline_id)["ancestor_timeline_id"] is None
ep.clear_buffers()
assert ep.safe_psql("SELECT count(*) FROM foo;")[0][0] == rows
assert ep.safe_psql("SELECT SUM(LENGTH(aux)) FROM foo")[0][0] != 0
ep.stop()
# finally restart the endpoint and make sure we still have the same answer
with env.endpoints.create_start("new main", tenant_id=env.initial_tenant) as ep:
assert ep.safe_psql("SELECT count(*) FROM foo;")[0][0] == rows
env.pageserver.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
def test_compaction_induced_by_detaches_in_history(
neon_env_builder: NeonEnvBuilder, test_output_dir, pg_bin: PgBin
):
"""
Assuming the tree of timelines:
root
|- child1
|- ...
|- wanted_detached_child
Each detach can add N more L0 per level, this is actually unbounded because
compaction can be arbitrarily delayed (or detach happen right before one
starts). If "wanted_detached_child" has already made progress and compacted
L1s, we want to make sure "compaction in the history" does not leave the
timeline broken.
"""
env = neon_env_builder.init_start(
initial_tenant_conf={
# we want to create layers manually so we don't branch on arbitrary
# Lsn, but we also do not want to compact L0 -> L1.
"compaction_threshold": "99999",
"compaction_period": "0s",
# shouldn't matter, but just in case
"gc_period": "0s",
}
)
env.pageserver.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
env.pageserver.allowed_errors.append(
".*await_initial_logical_size: can't get semaphore cancel token, skipping"
)
client = env.pageserver.http_client()
def delta_layers(timeline_id: TimelineId):
# shorthand for more readable formatting
return client.layer_map_info(env.initial_tenant, timeline_id).delta_layers()
with env.endpoints.create_start("main", tenant_id=env.initial_tenant) as ep:
ep.safe_psql("create table integers (i bigint not null);")
ep.safe_psql("insert into integers (i) values (42)")
branch_lsn = wait_for_last_flush_lsn(env, ep, env.initial_tenant, env.initial_timeline)
client.timeline_checkpoint(env.initial_tenant, env.initial_timeline)
assert len(delta_layers(env.initial_timeline)) == 2
more_good_numbers = range(0, 3)
branches: list[tuple[str, TimelineId]] = [("main", env.initial_timeline)]
for num in more_good_numbers:
branch_name = f"br-{len(branches)}"
branch_timeline_id = env.create_branch(
branch_name,
ancestor_branch_name=branches[-1][0],
ancestor_start_lsn=branch_lsn,
)
branches.append((branch_name, branch_timeline_id))
with env.endpoints.create_start(branches[-1][0], tenant_id=env.initial_tenant) as ep:
ep.safe_psql(
f"insert into integers (i) select i from generate_series({num}, {num + 100}) as s(i)"
)
branch_lsn = wait_for_last_flush_lsn(env, ep, env.initial_tenant, branch_timeline_id)
client.timeline_checkpoint(env.initial_tenant, branch_timeline_id)
assert len(delta_layers(branch_timeline_id)) == 1
# now fill in the final, most growing timeline
branch_name, branch_timeline_id = branches[-1]
with env.endpoints.create_start(branch_name, tenant_id=env.initial_tenant) as ep:
ep.safe_psql("insert into integers (i) select i from generate_series(50, 500) s(i)")
last_suffix = None
for suffix in range(0, 4):
ep.safe_psql(f"create table other_table_{suffix} as select * from integers")
wait_for_last_flush_lsn(env, ep, env.initial_tenant, branch_timeline_id)
client.timeline_checkpoint(env.initial_tenant, branch_timeline_id)
last_suffix = suffix
assert last_suffix is not None
assert len(delta_layers(branch_timeline_id)) == 5
env.storage_controller.pageserver_api().update_tenant_config(
env.initial_tenant, {"compaction_threshold": 5}, None
)
client.timeline_compact(env.initial_tenant, branch_timeline_id)
# one more layer
ep.safe_psql(f"create table other_table_{last_suffix + 1} as select * from integers")
wait_for_last_flush_lsn(env, ep, env.initial_tenant, branch_timeline_id)
# we need to wait here, because the detaches will do implicit tenant restart,
# and we could get unexpected layer counts
client.timeline_checkpoint(env.initial_tenant, branch_timeline_id, wait_until_uploaded=True)
assert len([filter(lambda x: x.l0, delta_layers(branch_timeline_id))]) == 1
skip_main = branches[1:]
branch_lsn = client.timeline_detail(env.initial_tenant, branch_timeline_id)["ancestor_lsn"]
# take the fullbackup before and after inheriting the new L0s
fullbackup_before = test_output_dir / "fullbackup-before.tar"
pg_bin.take_fullbackup(
env.pageserver, env.initial_tenant, branch_timeline_id, branch_lsn, fullbackup_before
)
# force initial logical sizes, so we can evict all layers from all
# timelines and exercise on-demand download for copy lsn prefix
client.timeline_detail(
env.initial_tenant, env.initial_timeline, force_await_initial_logical_size=True
)
client.evict_all_layers(env.initial_tenant, env.initial_timeline)
for _, timeline_id in skip_main:
reparented = client.detach_ancestor(env.initial_tenant, timeline_id)
assert reparented == set(), "we have no earlier branches at any level"
post_detach_l0s = list(filter(lambda x: x.l0, delta_layers(branch_timeline_id)))
assert len(post_detach_l0s) == 5, "should had inherited 4 L0s, have 5 in total"
# checkpoint does compaction, which in turn decides to run, because
# there is now in total threshold number L0s even if they are not
# adjacent in Lsn space:
#
# inherited flushed during this checkpoint
# \\\\ /
# 1234X5---> lsn
# |
# l1 layers from "fill in the final, most growing timeline"
#
# branch_lsn is between 4 and first X.
client.timeline_checkpoint(env.initial_tenant, branch_timeline_id)
post_compact_l0s = list(filter(lambda x: x.l0, delta_layers(branch_timeline_id)))
assert len(post_compact_l0s) == 1, "only the consecutive inherited L0s should be compacted"
fullbackup_after = test_output_dir / "fullbackup_after.tar"
pg_bin.take_fullbackup(
env.pageserver, env.initial_tenant, branch_timeline_id, branch_lsn, fullbackup_after
)
# we don't need to skip any files, because neon.signal will be identical
assert_pageserver_backups_equal(fullbackup_before, fullbackup_after, set())
@pytest.mark.parametrize("shards_initial_after", [(1, 1), (2, 2), (1, 4)])
def test_timeline_ancestor_detach_idempotent_success(
neon_env_builder: NeonEnvBuilder, shards_initial_after: tuple[int, int]
):
shards_initial = shards_initial_after[0]
shards_after = shards_initial_after[1]
neon_env_builder.num_pageservers = shards_after
env = neon_env_builder.init_start(
initial_tenant_shard_count=shards_initial if shards_initial > 1 else None,
initial_tenant_conf={
# small checkpointing and compaction targets to ensure we generate many upload operations
"checkpoint_distance": 512 * 1024,
"compaction_threshold": 1,
"compaction_target_size": 512 * 1024,
# disable background compaction and GC. We invoke it manually when we want it to happen.
"gc_period": "0s",
"compaction_period": "0s",
},
)
pageservers = dict((int(p.id), p) for p in env.pageservers)
for ps in pageservers.values():
ps.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
if shards_after > 1:
# FIXME: should this be in the neon_env_builder.init_start?
env.storage_controller.reconcile_until_idle(timeout_secs=120)
client = env.storage_controller.pageserver_api()
else:
client = env.pageserver.http_client()
# Write some data so that we have some layers to copy
with env.endpoints.create_start("main", tenant_id=env.initial_tenant) as endpoint:
endpoint.safe_psql_many(
[
"CREATE TABLE foo(key serial primary key, t text default 'data_content')",
"INSERT INTO foo SELECT FROM generate_series(1,1024)",
]
)
last_flush_lsn_upload(env, endpoint, env.initial_tenant, env.initial_timeline)
first_branch = env.create_branch("first_branch")
_ = env.create_branch("second_branch", ancestor_branch_name="first_branch")
# these two will be reparented, and they should be returned in stable order
# from pageservers OR otherwise there will be an `error!` logging from
# storage controller
reparented1 = env.create_branch("first_reparented", ancestor_branch_name="main")
reparented2 = env.create_branch("second_reparented", ancestor_branch_name="main")
if shards_after > shards_initial:
# Do a shard split
# This is a reproducer for https://github.com/neondatabase/neon/issues/9667
env.storage_controller.tenant_shard_split(env.initial_tenant, shards_after)
env.storage_controller.reconcile_until_idle(timeout_secs=120)
first_reparenting_response = client.detach_ancestor(env.initial_tenant, first_branch)
assert set(first_reparenting_response) == {reparented1, reparented2}
# FIXME: this should be done by the http req handler
for ps in pageservers.values():
ps.quiesce_tenants()
for _ in range(5):
# once completed, we can retry this how many times
assert (
client.detach_ancestor(env.initial_tenant, first_branch) == first_reparenting_response
)
client.tenant_delete(env.initial_tenant)
with pytest.raises(PageserverApiException) as e:
client.detach_ancestor(env.initial_tenant, first_branch)
assert e.value.status_code == 404
@pytest.mark.parametrize("sharded", [True, False])
def test_timeline_ancestor_detach_errors(neon_env_builder: NeonEnvBuilder, sharded: bool):
# the test is split from test_timeline_ancestor_detach_idempotent_success as only these error cases should create "request was dropped before completing",
# given the current first error handling
shards = 2 if sharded else 1
neon_env_builder.num_pageservers = shards
env = neon_env_builder.init_start(
initial_tenant_shard_count=shards if sharded else None,
initial_tenant_conf={
# turn off gc, we want to do manual offloading here.
"gc_period": "0s",
},
)
pageservers = dict((int(p.id), p) for p in env.pageservers)
for ps in pageservers.values():
ps.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
# We make /detach_ancestor requests that are intended to fail.
# It's expected that storcon drops requests to other pageservers after
# it gets the first error (https://github.com/neondatabase/neon/issues/11177)
ps.allowed_errors.extend(
[
".* WARN .* path=/v1/tenant/.*/timeline/.*/detach_ancestor request_id=.*: request was dropped before completing",
".* ERROR .* path=/v1/tenant/.*/timeline/.*/detach_ancestor request_id=.*: Cancelled request finished with an error.*",
]
)
client = (
env.pageserver.http_client() if not sharded else env.storage_controller.pageserver_api()
)
with pytest.raises(PageserverApiException, match=".* no ancestors") as info:
client.detach_ancestor(env.initial_tenant, env.initial_timeline)
assert info.value.status_code == 409
early_branch = env.create_branch("early_branch")
first_branch = env.create_branch("first_branch")
second_branch = env.create_branch("second_branch", ancestor_branch_name="first_branch")
# funnily enough this does not have a prefix
with pytest.raises(PageserverApiException, match="too many ancestors") as info:
client.detach_ancestor(env.initial_tenant, second_branch)
assert info.value.status_code == 400
client.timeline_archival_config(
env.initial_tenant, second_branch, TimelineArchivalState.ARCHIVED
)
client.timeline_archival_config(
env.initial_tenant, early_branch, TimelineArchivalState.ARCHIVED
)
with pytest.raises(PageserverApiException, match=f".*archived: {early_branch}") as info:
client.detach_ancestor(env.initial_tenant, first_branch)
assert info.value.status_code == 400
if not sharded:
client.timeline_offload(env.initial_tenant, early_branch)
client.timeline_archival_config(
env.initial_tenant, first_branch, TimelineArchivalState.ARCHIVED
)
with pytest.raises(PageserverApiException, match=f".*archived: {first_branch}") as info:
client.detach_ancestor(env.initial_tenant, first_branch)
assert info.value.status_code == 400
def test_sharded_timeline_detach_ancestor(neon_env_builder: NeonEnvBuilder):
"""
Sharded timeline detach ancestor; 4 nodes: 1 stuck, 1 restarted, 2 normal.
Stuck node gets stuck on a pause failpoint for first storage controller request.
Restarted node remains stuck until explicit restart from test code.
We retry the request until storage controller gets 200 OK from all nodes.
"""
branch_name = "soon_detached"
shard_count = 4
neon_env_builder.num_pageservers = shard_count
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.MOCK_S3)
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
for ps in env.pageservers:
ps.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
# FIXME: should this be in the neon_env_builder.init_start?
env.storage_controller.reconcile_until_idle()
# as we will stop a node, make sure there is no clever rebalancing
env.storage_controller.tenant_policy_update(env.initial_tenant, body={"scheduling": "Stop"})
env.storage_controller.allowed_errors.append(".*: Scheduling is disabled by policy Stop .*")
shards = env.storage_controller.locate(env.initial_tenant)
utilized_pageservers = {x["node_id"] for x in shards}
assert len(utilized_pageservers) > 1, "all shards got placed on single pageserver?"
branch_timeline_id = env.create_branch(branch_name)
with env.endpoints.create_start(branch_name, tenant_id=env.initial_tenant) as ep:
ep.safe_psql(
"create table foo as select 1::bigint, i::bigint from generate_series(1, 10000) v(i)"
)
lsn = flush_ep_to_pageserver(env, ep, env.initial_tenant, branch_timeline_id)
pageservers = dict((int(p.id), p) for p in env.pageservers)
for shard_info in shards:
node_id = int(shard_info["node_id"])
shard_id = shard_info["shard_id"]
detail = pageservers[node_id].http_client().timeline_detail(shard_id, branch_timeline_id)
assert Lsn(detail["last_record_lsn"]) >= lsn
assert Lsn(detail["initdb_lsn"]) < lsn
assert TimelineId(detail["ancestor_timeline_id"]) == env.initial_timeline
# make one of the nodes get stuck, but continue the initial operation
# make another of the nodes get stuck, then restart
stuck = pageservers[int(shards[0]["node_id"])]
log.info(f"stuck pageserver is id={stuck.id}")
stuck_http = stuck.http_client()
stuck_http.configure_failpoints(
("timeline-detach-ancestor::before_starting_after_locking-pausable", "pause")
)
restarted = pageservers[int(shards[1]["node_id"])]
log.info(f"restarted pageserver is id={restarted.id}")
# this might be hit; see `restart_restarted`
restarted.allowed_errors.append(".*: Cancelled request finished with an error: ShuttingDown")
assert restarted.id != stuck.id
restarted_http = restarted.http_client()
restarted_http.configure_failpoints(
[
("timeline-detach-ancestor::before_starting_after_locking-pausable", "pause"),
]
)
for info in shards:
pageserver = pageservers[int(info["node_id"])]
# the first request can cause these, but does not repeatedly
pageserver.allowed_errors.append(".*: request was dropped before completing")
# first request again
env.storage_controller.allowed_errors.append(".*: request was dropped before completing")
target = env.storage_controller.pageserver_api()
with pytest.raises(ReadTimeout):
target.detach_ancestor(env.initial_tenant, branch_timeline_id, timeout=1)
stuck_http.configure_failpoints(
("timeline-detach-ancestor::before_starting_after_locking-pausable", "off")
)
barrier = threading.Barrier(2)
def restart_restarted():
barrier.wait()
# graceful shutdown should just work, because simultaneously unpaused
restarted.stop()
# this does not happen always, depends how fast we exit after unpausing
# restarted.assert_log_contains("Cancelled request finished with an error: ShuttingDown")
restarted.start()
with ThreadPoolExecutor(max_workers=1) as pool:
fut = pool.submit(restart_restarted)
barrier.wait()
# we have 10s, lets use 1/2 of that to help the shutdown start
time.sleep(5)
restarted_http.configure_failpoints(
("timeline-detach-ancestor::before_starting_after_locking-pausable", "off")
)
fut.result()
# detach ancestor request handling is not sensitive to http cancellation.
# this means that the "stuck" is on its way to complete the detach, but the restarted is off
# now it can either be complete on all nodes, or still in progress with
# one.
without_retrying = target.without_status_retrying()
# this retry loop will be long enough that the tenant can always activate
reparented = None
for _ in range(10):
try:
reparented = without_retrying.detach_ancestor(env.initial_tenant, branch_timeline_id)
except PageserverApiException as info:
assert info.status_code == 503
time.sleep(2)
else:
break
assert reparented == set(), "too many retries (None) or unexpected reparentings"
for shard_info in shards:
node_id = int(shard_info["node_id"])
shard_id = shard_info["shard_id"]
# TODO: ensure quescing is done on pageserver?
pageservers[node_id].quiesce_tenants()
detail = pageservers[node_id].http_client().timeline_detail(shard_id, branch_timeline_id)
wait_for_last_record_lsn(
pageservers[node_id].http_client(), shard_id, branch_timeline_id, lsn
)
assert detail.get("ancestor_timeline_id") is None
with env.endpoints.create_start(branch_name, tenant_id=env.initial_tenant) as ep:
count = int(ep.safe_psql("select count(*) from foo")[0][0])
assert count == 10000
@pytest.mark.parametrize(
"mode, sharded",
[
("delete_timeline", False),
("delete_timeline", True),
("delete_tenant", False),
# the shared/exclusive lock for tenant is blocking this:
# timeline detach ancestor takes shared, delete tenant takes exclusive
# ("delete_tenant", True)
],
)
def test_timeline_detach_ancestor_interrupted_by_deletion(
neon_env_builder: NeonEnvBuilder, mode: str, sharded: bool
):
"""
Timeline ancestor detach interrupted by deleting either:
- the detached timeline
- the whole tenant
after starting the detach.
What remains not tested by this:
- shutdown winning over complete, see test_timeline_is_deleted_before_timeline_detach_ancestor_completes
"""
shard_count = 2 if sharded else 1
neon_env_builder.num_pageservers = shard_count
env = neon_env_builder.init_start(
initial_tenant_shard_count=shard_count if sharded else None,
initial_tenant_conf={
"gc_period": "1s",
"lsn_lease_length": "0s",
},
)
for ps in env.pageservers:
ps.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
ps.allowed_errors.extend(
[".*Timeline.* has been deleted.*", ".*Timeline.*was cancelled and cannot be used"]
)
pageservers = dict((int(p.id), p) for p in env.pageservers)
detached_timeline = env.create_branch("detached soon", ancestor_branch_name="main")
pausepoint = "timeline-detach-ancestor::before_starting_after_locking-pausable"
env.storage_controller.reconcile_until_idle()
shards = env.storage_controller.locate(env.initial_tenant)
assert len(set(info["node_id"] for info in shards)) == shard_count
target = env.storage_controller.pageserver_api() if sharded else env.pageserver.http_client()
target = target.without_status_retrying()
victim = pageservers[int(shards[-1]["node_id"])]
victim_http = victim.http_client()
victim_http.configure_failpoints((pausepoint, "pause"))
def detach_ancestor():
target.detach_ancestor(env.initial_tenant, detached_timeline)
def at_failpoint() -> LogCursor:
msg, offset = victim.assert_log_contains(f"at failpoint {pausepoint}")
log.info(f"found {msg}")
msg, offset = victim.assert_log_contains(
".* gc_loop.*: Skipping GC: .*",
offset,
)
log.info(f"found {msg}")
return offset
def start_delete():
if mode == "delete_timeline":
target.timeline_delete(env.initial_tenant, detached_timeline)
elif mode == "delete_tenant":
target.tenant_delete(env.initial_tenant)
else:
raise RuntimeError(f"unimplemented mode {mode}")
def at_waiting_on_gate_close(start_offset: LogCursor) -> LogCursor:
_, offset = victim.assert_log_contains(
"closing is taking longer than expected", offset=start_offset
)
return offset
def is_deleted():
try:
if mode == "delete_timeline":
target.timeline_detail(env.initial_tenant, detached_timeline)
elif mode == "delete_tenant":
target.tenant_status(env.initial_tenant)
else:
return False
except PageserverApiException as e:
assert e.status_code == 404
return True
else:
raise RuntimeError("waiting for 404")
with ThreadPoolExecutor(max_workers=2) as pool:
try:
fut = pool.submit(detach_ancestor)
offset = wait_until(at_failpoint)
delete = pool.submit(start_delete)
offset = wait_until(lambda: at_waiting_on_gate_close(offset))
victim_http.configure_failpoints((pausepoint, "off"))
delete.result()
assert wait_until(is_deleted), f"unimplemented mode {mode}"
# TODO: match the error
with pytest.raises(PageserverApiException) as exc:
fut.result()
log.info(f"TODO: match this error: {exc.value}")
assert exc.value.status_code == 503
finally:
victim_http.configure_failpoints((pausepoint, "off"))
if mode != "delete_timeline":
return
# make sure the gc is unblocked
time.sleep(2)
victim.assert_log_contains(".* gc_loop.*: 1 timelines need GC", offset)
if not sharded:
# we have the other node only while sharded
return
other = pageservers[int(shards[0]["node_id"])]
log.info(f"other is {other.id}")
_, offset = other.assert_log_contains(
".*INFO request\\{method=PUT path=/v1/tenant/\\S+/timeline/\\S+/detach_ancestor .*\\}: Request handled, status: 200 OK",
)
# this might be a lot earlier than the victims line, but that is okay.
_, offset = other.assert_log_contains(".* gc_loop.*: 1 timelines need GC", offset)
@pytest.mark.parametrize("mode", ["delete_reparentable_timeline", "create_reparentable_timeline"])
def test_sharded_tad_interleaved_after_partial_success(neon_env_builder: NeonEnvBuilder, mode: str):
"""
Technically possible storage controller concurrent interleaving timeline
deletion with timeline detach.
Deletion is fine, as any sharded pageservers reach the same end state, but
creating reparentable timeline would create an issue as the two nodes would
never agree. There is a solution though: the created reparentable timeline
must be detached.
"""
shard_count = 2
neon_env_builder.num_pageservers = shard_count
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
for ps in env.pageservers:
ps.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
ps.allowed_errors.extend(
[".*Timeline.* has been deleted.*", ".*Timeline.*was cancelled and cannot be used"]
)
pageservers = dict((int(p.id), p) for p in env.pageservers)
env.storage_controller.reconcile_until_idle()
shards = env.storage_controller.locate(env.initial_tenant)
assert len(set(x["node_id"] for x in shards)) == shard_count
with env.endpoints.create_start("main") as ep:
ep.safe_psql("create table foo as select i::bigint from generate_series(1, 1000) t(i)")
# as the interleaved operation, we will delete this timeline, which was reparenting candidate
first_branch_lsn = wait_for_last_flush_lsn(
env, ep, env.initial_tenant, env.initial_timeline
)
for ps, shard_id in [(pageservers[int(x["node_id"])], x["shard_id"]) for x in shards]:
ps.http_client().timeline_checkpoint(shard_id, env.initial_timeline)
ep.safe_psql("create table bar as select i::bigint from generate_series(1, 2000) t(i)")
detached_branch_lsn = flush_ep_to_pageserver(
env, ep, env.initial_tenant, env.initial_timeline
)
for ps, shard_id in [(pageservers[int(x["node_id"])], x["shard_id"]) for x in shards]:
ps.http_client().timeline_checkpoint(shard_id, env.initial_timeline)
def create_reparentable_timeline() -> TimelineId:
return env.create_branch(
"first_branch", ancestor_branch_name="main", ancestor_start_lsn=first_branch_lsn
)
if mode == "delete_reparentable_timeline":
first_branch = create_reparentable_timeline()
else:
first_branch = None
detached_branch = env.create_branch(
"detached_branch", ancestor_branch_name="main", ancestor_start_lsn=detached_branch_lsn
)
pausepoint = "timeline-detach-ancestor::before_starting_after_locking-pausable"
stuck = pageservers[int(shards[0]["node_id"])]
stuck_http = stuck.http_client().without_status_retrying()
stuck_http.configure_failpoints((pausepoint, "pause"))
victim = pageservers[int(shards[-1]["node_id"])]
victim_http = victim.http_client().without_status_retrying()
victim_http.configure_failpoints(
(pausepoint, "pause"),
)
# interleaving a create_timeline which could be reparented will produce two
# permanently different reparentings: one node has reparented, other has
# not
#
# with deletion there is no such problem
def detach_timeline():
env.storage_controller.pageserver_api().detach_ancestor(env.initial_tenant, detached_branch)
def paused_at_failpoint():
stuck.assert_log_contains(f"at failpoint {pausepoint}")
victim.assert_log_contains(f"at failpoint {pausepoint}")
def first_completed():
detail = stuck_http.timeline_detail(shards[0]["shard_id"], detached_branch)
log.info(detail)
assert detail.get("ancestor_lsn") is None
def first_branch_gone():
assert first_branch is not None
try:
env.storage_controller.pageserver_api().timeline_detail(
env.initial_tenant, first_branch
)
except PageserverApiException as e:
log.info(f"error {e}")
assert e.status_code == 404
else:
log.info("still ok")
raise RuntimeError("not done yet")
with ThreadPoolExecutor(max_workers=1) as pool:
try:
fut = pool.submit(detach_timeline)
wait_until(paused_at_failpoint)
# let stuck complete
stuck_http.configure_failpoints((pausepoint, "off"))
wait_until(first_completed)
if mode == "delete_reparentable_timeline":
assert first_branch is not None
env.storage_controller.pageserver_api().timeline_delete(
env.initial_tenant, first_branch
)
victim_http.configure_failpoints((pausepoint, "off"))
wait_until(first_branch_gone)
elif mode == "create_reparentable_timeline":
first_branch = create_reparentable_timeline()
victim_http.configure_failpoints((pausepoint, "off"))
else:
raise RuntimeError("{mode}")
# it now passes, and we should get an error messages about mixed reparenting as the stuck still had something to reparent
mixed_results = "pageservers returned mixed results for ancestor detach; manual intervention is required."
with pytest.raises(PageserverApiException, match=mixed_results):
fut.result()
msg, offset = env.storage_controller.assert_log_contains(
".*/timeline/\\S+/detach_ancestor.*: shards returned different results matching=0 .*"
)
log.info(f"expected error message: {msg.rstrip()}")
env.storage_controller.allowed_errors.extend(
[
".*: shards returned different results matching=0 .*",
f".*: InternalServerError\\({mixed_results}",
]
)
if mode == "create_reparentable_timeline":
with pytest.raises(PageserverApiException, match=mixed_results):
detach_timeline()
else:
# it is a bit shame to flag it and then it suceeds, but most
# likely there would be a retry loop which would take care of
# this in cplane
detach_timeline()
retried = env.storage_controller.log_contains(
".*/timeline/\\S+/detach_ancestor.*: shards returned different results matching=0 .*",
offset,
)
if mode == "delete_reparentable_timeline":
assert retried is None, (
"detaching should had converged after both nodes saw the deletion"
)
elif mode == "create_reparentable_timeline":
assert retried is not None, "detaching should not have converged"
_, offset = retried
finally:
stuck_http.configure_failpoints((pausepoint, "off"))
victim_http.configure_failpoints((pausepoint, "off"))
if mode == "create_reparentable_timeline":
assert first_branch is not None
# now we have mixed ancestry
assert (
TimelineId(
stuck_http.timeline_detail(shards[0]["shard_id"], first_branch)[
"ancestor_timeline_id"
]
)
== env.initial_timeline
)
assert (
TimelineId(
victim_http.timeline_detail(shards[-1]["shard_id"], first_branch)[
"ancestor_timeline_id"
]
)
== detached_branch
)
# make sure we are still able to repair this by detaching the ancestor on the storage controller in case it ever happens
# if the ancestor would be deleted, we would partially fail, making deletion stuck.
env.storage_controller.pageserver_api().detach_ancestor(env.initial_tenant, first_branch)
# and we should now have good results
not_found = env.storage_controller.log_contains(
".*/timeline/\\S+/detach_ancestor.*: shards returned different results matching=0 .*",
offset,
)
assert not_found is None
assert (
stuck_http.timeline_detail(shards[0]["shard_id"], first_branch)["ancestor_timeline_id"]
is None
)
assert (
victim_http.timeline_detail(shards[-1]["shard_id"], first_branch)[
"ancestor_timeline_id"
]
is None
)
@pytest.mark.parametrize("detach_behavior", ["default", "v1", "v2"])
def test_retryable_500_hit_through_storcon_during_timeline_detach_ancestor(
neon_env_builder: NeonEnvBuilder,
detach_behavior: str,
):
shard_count = 2
neon_env_builder.num_pageservers = shard_count
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
for ps in env.pageservers:
ps.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
pageservers = dict((int(p.id), p) for p in env.pageservers)
env.storage_controller.reconcile_until_idle()
shards = env.storage_controller.locate(env.initial_tenant)
assert len(set(x["node_id"] for x in shards)) == shard_count
detached_branch = env.create_branch("detached_branch", ancestor_branch_name="main")
pausepoint = "timeline-detach-ancestor::before_starting_after_locking-pausable"
failpoint = "timeline-detach-ancestor::before_starting_after_locking"
stuck = pageservers[int(shards[0]["node_id"])]
stuck_http = stuck.http_client().without_status_retrying()
stuck_http.configure_failpoints(
(pausepoint, "pause"),
)
env.storage_controller.allowed_errors.append(
f".*Error processing HTTP request: .* failpoint: {failpoint}"
)
http = env.storage_controller.pageserver_api()
victim = pageservers[int(shards[-1]["node_id"])]
victim.allowed_errors.append(
f".*Error processing HTTP request: InternalServerError\\(failpoint: {failpoint}"
)
victim_http = victim.http_client().without_status_retrying()
victim_http.configure_failpoints([(pausepoint, "pause"), (failpoint, "return")])
def detach_timeline():
http.detach_ancestor(
env.initial_tenant,
detached_branch,
detach_behavior=detach_behavior if detach_behavior != "default" else None,
)
def paused_at_failpoint():
stuck.assert_log_contains(f"at failpoint {pausepoint}")
victim.assert_log_contains(f"at failpoint {pausepoint}")
def first_completed():
detail = stuck_http.timeline_detail(shards[0]["shard_id"], detached_branch)
log.info(detail)
assert detail.get("ancestor_lsn") is None
with ThreadPoolExecutor(max_workers=1) as pool:
try:
fut = pool.submit(detach_timeline)
wait_until(paused_at_failpoint)
# let stuck complete
stuck_http.configure_failpoints((pausepoint, "off"))
wait_until(first_completed)
victim_http.configure_failpoints((pausepoint, "off"))
with pytest.raises(
PageserverApiException,
match=f".*failpoint: {failpoint}",
) as exc:
fut.result()
assert exc.value.status_code == 500
finally:
stuck_http.configure_failpoints((pausepoint, "off"))
victim_http.configure_failpoints((pausepoint, "off"))
victim_http.configure_failpoints((failpoint, "off"))
detach_timeline()
def test_retried_detach_ancestor_after_failed_reparenting(neon_env_builder: NeonEnvBuilder):
"""
Using a failpoint, force the completion step of timeline ancestor detach to
fail after reparenting a single timeline.
Retrying should try reparenting until all reparentings are done, all the
time blocking gc even across restarts (first round).
A completion failpoint is used to inhibit completion on second to last
round.
On last round, the completion uses a path where no reparentings can happen
because original ancestor is deleted, and there is a completion to unblock
gc without restart.
"""
# to get the remote storage metrics
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.MOCK_S3)
env = neon_env_builder.init_start(
initial_tenant_conf={
"gc_period": "1s",
"lsn_lease_length": "0s",
}
)
env.pageserver.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
env.pageserver.allowed_errors.extend(
[
".* reparenting failed: failpoint: timeline-detach-ancestor::allow_one_reparented",
".* Error processing HTTP request: InternalServerError\\(failed to reparent all candidate timelines, please retry",
".* Error processing HTTP request: InternalServerError\\(failpoint: timeline-detach-ancestor::complete_before_uploading",
]
)
http = env.pageserver.http_client()
def remote_storage_copy_requests():
return http.get_metric_value(
"remote_storage_s3_request_seconds_count",
{"request_type": "copy_object", "result": "ok"},
)
def reparenting_progress(timelines: list[TimelineId]) -> tuple[int, set[TimelineId]]:
reparented = 0
not_reparented = set()
for timeline in timelines:
detail = http.timeline_detail(env.initial_tenant, timeline)
ancestor = TimelineId(detail["ancestor_timeline_id"])
if ancestor == detached:
reparented += 1
else:
not_reparented.add(timeline)
return (reparented, not_reparented)
# main ------A-----B-----C-----D-----E> lsn
timelines = []
with env.endpoints.create_start("main") as ep:
for counter in range(5):
ep.safe_psql(
f"create table foo_{counter} as select i::bigint from generate_series(1, 10000) t(i)"
)
branch_lsn = wait_for_last_flush_lsn(env, ep, env.initial_tenant, env.initial_timeline)
http.timeline_checkpoint(env.initial_tenant, env.initial_timeline)
branch = env.create_branch(
f"branch_{counter}", ancestor_branch_name="main", ancestor_start_lsn=branch_lsn
)
timelines.append(branch)
flush_ep_to_pageserver(env, ep, env.initial_tenant, env.initial_timeline)
# detach "E" which has most reparentable timelines under it
detached = timelines.pop()
assert len(timelines) == 4
http = http.without_status_retrying()
http.configure_failpoints(("timeline-detach-ancestor::allow_one_reparented", "return"))
not_reparented: set[TimelineId] = set()
# tracked offset in the pageserver log which is at least at the most recent activation
offset = None
def try_detach():
with pytest.raises(
PageserverApiException,
match=".*failed to reparent all candidate timelines, please retry",
) as exc:
http.detach_ancestor(env.initial_tenant, detached)
assert exc.value.status_code == 503
# first round -- do more checking to make sure the gc gets paused
try_detach()
assert http.timeline_detail(env.initial_tenant, detached)["ancestor_timeline_id"] is None, (
"first round should had detached 'detached'"
)
reparented, not_reparented = reparenting_progress(timelines)
assert reparented == 1
time.sleep(2)
_, offset = env.pageserver.assert_log_contains(
".*INFO request\\{method=PUT path=/v1/tenant/[0-9a-f]{32}/timeline/[0-9a-f]{32}/detach_ancestor .*\\}: Handling request",
offset,
)
_, offset = env.pageserver.assert_log_contains(".*: attach finished, activating", offset)
_, offset = env.pageserver.assert_log_contains(
".* gc_loop.*: Skipping GC: .*",
offset,
)
metric = remote_storage_copy_requests()
assert metric != 0
# make sure the gc blocking is persistent over a restart
env.pageserver.restart()
env.pageserver.quiesce_tenants()
time.sleep(2)
_, offset = env.pageserver.assert_log_contains(".*: attach finished, activating", offset)
assert env.pageserver.log_contains(".* gc_loop.*: [0-9] timelines need GC", offset) is None
_, offset = env.pageserver.assert_log_contains(
".* gc_loop.*: Skipping GC: .*",
offset,
)
# restore failpoint for the next reparented
http.configure_failpoints(("timeline-detach-ancestor::allow_one_reparented", "return"))
reparented_before = reparented
# do two more rounds
for _ in range(2):
try_detach()
assert http.timeline_detail(env.initial_tenant, detached)["ancestor_timeline_id"] is None, (
"first round should had detached 'detached'"
)
reparented, not_reparented = reparenting_progress(timelines)
assert reparented == reparented_before + 1
reparented_before = reparented
_, offset = env.pageserver.assert_log_contains(".*: attach finished, activating", offset)
metric = remote_storage_copy_requests()
assert metric == 0, "copies happen in the first round"
assert offset is not None
assert len(not_reparented) == 1
http.configure_failpoints(("timeline-detach-ancestor::complete_before_uploading", "return"))
# almost final round, the failpoint is hit no longer as there is only one reparented and one always gets to succeed.
# the tenant is restarted once more, but we fail during completing.
with pytest.raises(
PageserverApiException, match=".* timeline-detach-ancestor::complete_before_uploading"
) as exc:
http.detach_ancestor(env.initial_tenant, detached)
assert exc.value.status_code == 500
_, offset = env.pageserver.assert_log_contains(".*: attach finished, activating", offset)
# delete the previous ancestor to take a different path to completion. all
# other tests take the "detach? reparent complete", but this only hits
# "complete".
http.timeline_delete(env.initial_tenant, env.initial_timeline)
wait_timeline_detail_404(http, env.initial_tenant, env.initial_timeline)
http.configure_failpoints(("timeline-detach-ancestor::complete_before_uploading", "off"))
reparented_resp = http.detach_ancestor(env.initial_tenant, detached)
assert reparented_resp == set(timelines)
# no need to quiesce_tenants anymore, because completion does that
reparented, not_reparented = reparenting_progress(timelines)
assert reparented == len(timelines)
time.sleep(2)
assert env.pageserver.log_contains(".*: attach finished, activating", offset) is None, (
"there should be no restart with the final detach_ancestor as it only completed"
)
# gc is unblocked
env.pageserver.assert_log_contains(".* gc_loop.*: 5 timelines need GC", offset)
metric = remote_storage_copy_requests()
assert metric == 0
def test_timeline_is_deleted_before_timeline_detach_ancestor_completes(
neon_env_builder: NeonEnvBuilder,
):
"""
Make sure that a timeline deleted after restart will unpause gc blocking.
"""
env = neon_env_builder.init_start(
initial_tenant_conf={
"gc_period": "1s",
"lsn_lease_length": "0s",
}
)
env.pageserver.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
http = env.pageserver.http_client()
detached = env.create_branch("detached")
failpoint = "timeline-detach-ancestor::after_activating_before_finding-pausable"
http.configure_failpoints((failpoint, "pause"))
def detach_and_get_stuck():
return http.detach_ancestor(env.initial_tenant, detached)
def request_processing_noted_in_log():
_, offset = env.pageserver.assert_log_contains(
".*INFO request\\{method=PUT path=/v1/tenant/[0-9a-f]{32}/timeline/[0-9a-f]{32}/detach_ancestor .*\\}: Handling request",
)
return offset
def delete_detached():
return http.timeline_delete(env.initial_tenant, detached)
try:
with ThreadPoolExecutor(max_workers=1) as pool:
detach = pool.submit(detach_and_get_stuck)
offset = wait_until(request_processing_noted_in_log)
# make this named fn tor more clear failure test output logging
def pausepoint_hit_with_gc_paused() -> LogCursor:
env.pageserver.assert_log_contains(f"at failpoint {failpoint}")
_, at = env.pageserver.assert_log_contains(
".* gc_loop.*: Skipping GC: .*",
offset,
)
return at
offset = wait_until(pausepoint_hit_with_gc_paused)
delete_detached()
wait_timeline_detail_404(http, env.initial_tenant, detached)
http.configure_failpoints((failpoint, "off"))
with pytest.raises(
PageserverApiException, match="NotFound: Timeline .* was not found"
) as exc:
detach.result()
assert exc.value.status_code == 404
finally:
http.configure_failpoints((failpoint, "off"))
# make sure gc has been unblocked
time.sleep(2)
env.pageserver.assert_log_contains(".* gc_loop.*: 1 timelines need GC", offset)
@skip_in_debug_build("only run with release build")
def test_pageserver_compaction_detach_ancestor_smoke(neon_env_builder: NeonEnvBuilder):
SMOKE_CONF = {
# Run both gc and gc-compaction.
"gc_period": "5s",
"compaction_period": "5s",
# No PiTR interval and small GC horizon
"pitr_interval": "0s",
"gc_horizon": f"{1024**2}",
"lsn_lease_length": "0s",
# Small checkpoint distance to create many layers
"checkpoint_distance": 1024**2,
# Compact small layers
"compaction_target_size": 1024**2,
"image_creation_threshold": 2,
}
env = neon_env_builder.init_start(initial_tenant_conf=SMOKE_CONF)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
row_count = 10000
churn_rounds = 50
ps_http = env.pageserver.http_client()
workload_parent = Workload(env, tenant_id, timeline_id)
workload_parent.init(env.pageserver.id)
log.info("Writing initial data ...")
workload_parent.write_rows(row_count, env.pageserver.id)
branch_id = env.create_branch("child")
workload_child = Workload(env, tenant_id, branch_id, branch_name="child")
workload_child.init(env.pageserver.id, allow_recreate=True)
log.info("Writing initial data on child...")
workload_child.write_rows(row_count, env.pageserver.id)
for i in range(1, churn_rounds + 1):
if i % 10 == 0:
log.info(f"Running churn round {i}/{churn_rounds} ...")
workload_parent.churn_rows(row_count, env.pageserver.id)
workload_child.churn_rows(row_count, env.pageserver.id)
ps_http.detach_ancestor(tenant_id, branch_id)
log.info("Validating at workload end ...")
workload_parent.validate(env.pageserver.id)
workload_child.validate(env.pageserver.id)
def test_timeline_detach_with_aux_files_with_detach_v1(
neon_env_builder: NeonEnvBuilder,
):
"""
Validate that "branches do not inherit their parent" is invariant over detach_ancestor.
Branches hide parent branch aux files etc by stopping lookup of non-inherited keyspace at the parent-child boundary.
We had a bug where detach_ancestor running on a child branch would copy aux files key range from child to parent,
thereby making parent aux files reappear.
"""
env = neon_env_builder.init_start(
initial_tenant_conf={
"gc_period": "1s",
"lsn_lease_length": "0s",
}
)
env.pageserver.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
http = env.pageserver.http_client()
endpoint = env.endpoints.create_start("main", tenant_id=env.initial_tenant)
lsn0 = wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline)
endpoint.safe_psql(
"SELECT pg_create_logical_replication_slot('test_slot_parent_1', 'pgoutput')"
)
lsn1 = wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline)
endpoint.safe_psql(
"SELECT pg_create_logical_replication_slot('test_slot_parent_2', 'pgoutput')"
)
lsn2 = wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline)
assert set(http.list_aux_files(env.initial_tenant, env.initial_timeline, lsn0).keys()) == set(
[]
)
assert set(http.list_aux_files(env.initial_tenant, env.initial_timeline, lsn1).keys()) == set(
["pg_replslot/test_slot_parent_1/state"]
)
assert set(http.list_aux_files(env.initial_tenant, env.initial_timeline, lsn2).keys()) == set(
["pg_replslot/test_slot_parent_1/state", "pg_replslot/test_slot_parent_2/state"]
)
# Restore at LSN1
branch_timeline_id = env.create_branch("restore", env.initial_tenant, "main", lsn1)
endpoint2 = env.endpoints.create_start("restore", tenant_id=env.initial_tenant)
assert set(http.list_aux_files(env.initial_tenant, branch_timeline_id, lsn1).keys()) == set([])
# Add a new slot file to the restore branch (This won't happen in reality because cplane immediately detaches the branch on restore,
# but we want to ensure that aux files on the detached branch are NOT inherited during ancestor detach. We could change the behavior
# in the future.
# TL;DR we should NEVER automatically detach a branch as a background optimization for those tenants that already used the restore
# feature before branch detach was introduced because it will clean up the aux files and stop logical replication.
endpoint2.safe_psql(
"SELECT pg_create_logical_replication_slot('test_slot_restore', 'pgoutput')"
)
lsn3 = wait_for_last_flush_lsn(env, endpoint2, env.initial_tenant, branch_timeline_id)
assert set(http.list_aux_files(env.initial_tenant, branch_timeline_id, lsn1).keys()) == set([])
assert set(http.list_aux_files(env.initial_tenant, branch_timeline_id, lsn3).keys()) == set(
["pg_replslot/test_slot_restore/state"]
)
print("lsn0=", lsn0)
print("lsn1=", lsn1)
print("lsn2=", lsn2)
print("lsn3=", lsn3)
# Detach the restore branch so that main doesn't have any child branches.
all_reparented = http.detach_ancestor(
env.initial_tenant, branch_timeline_id, detach_behavior="v1"
)
assert all_reparented == set([])
# We need to ensure all safekeeper data are ingested before checking aux files: the API does not wait for LSN.
wait_for_last_flush_lsn(env, endpoint2, env.initial_tenant, branch_timeline_id)
assert set(http.list_aux_files(env.initial_tenant, env.initial_timeline, lsn2).keys()) == set(
["pg_replslot/test_slot_parent_1/state", "pg_replslot/test_slot_parent_2/state"]
), "main branch unaffected"
assert set(http.list_aux_files(env.initial_tenant, branch_timeline_id, lsn3).keys()) == set(
["pg_replslot/test_slot_restore/state"]
)
assert set(http.list_aux_files(env.initial_tenant, branch_timeline_id, lsn1).keys()) == set([])
def test_detach_ancestors_with_no_writes(
neon_env_builder: NeonEnvBuilder,
):
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start("main", tenant_id=env.initial_tenant)
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline)
endpoint.safe_psql(
"SELECT pg_create_logical_replication_slot('test_slot_parent_1', 'pgoutput')"
)
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline)
endpoint.stop()
for i in range(0, 5):
if i == 0:
ancestor_name = "main"
else:
ancestor_name = f"b{i}"
tlid = env.create_branch(f"b{i + 1}", ancestor_branch_name=ancestor_name)
client = env.pageserver.http_client()
client.detach_ancestor(tenant_id=env.initial_tenant, timeline_id=tlid)
# TODO:
# - branch near existing L1 boundary, image layers?
# - investigate: why are layers started at uneven lsn? not just after branching, but in general.
#
# TEST: 1. tad which partially succeeds, one returns 500
# 2. create branch below timeline? ~or delete reparented timeline~ (done)
# 3. on retry all should report the same reparented timelines