Merge branch 'problame/lease-deadline-tests' into problame/standby-horizon-removal-poc-rip-out

This commit is contained in:
Christian Schwarz
2025-07-02 16:02:29 +02:00
70 changed files with 2252 additions and 557 deletions

View File

@@ -453,7 +453,7 @@ class NeonEnvBuilder:
pageserver_get_vectored_concurrent_io: str | None = None,
pageserver_tracing_config: PageserverTracingConfig | None = None,
pageserver_import_config: PageserverImportConfig | None = None,
storcon_kick_secondary_downloads: bool | None = None,
storcon_kick_secondary_downloads: bool | None = True,
):
self.repo_dir = repo_dir
self.rust_log_override = rust_log_override
@@ -724,15 +724,21 @@ class NeonEnvBuilder:
shutil.copytree(storcon_db_from_dir, storcon_db_to_dir, ignore=ignore_postgres_log)
assert not (storcon_db_to_dir / "postgres.log").exists()
# NB: neon_local rewrites postgresql.conf on each start based on neon_local config. No need to patch it.
# However, in this new NeonEnv, the pageservers listen on different ports, and the storage controller
# will currently reject re-attach requests from them because the NodeMetadata isn't identical.
# However, in this new NeonEnv, the pageservers and safekeepers listen on different ports, and the storage
# controller will currently reject re-attach requests from them because the NodeMetadata isn't identical.
# So, from_repo_dir patches up the the storcon database.
patch_script_path = self.repo_dir / "storage_controller_db.startup.sql"
assert not patch_script_path.exists()
patch_script = ""
for ps in self.env.pageservers:
patch_script += f"UPDATE nodes SET listen_http_port={ps.service_port.http}, listen_pg_port={ps.service_port.pg} WHERE node_id = '{ps.id}';"
patch_script += f"UPDATE nodes SET listen_http_port={ps.service_port.http}, listen_pg_port={ps.service_port.pg} WHERE node_id = '{ps.id}';\n"
for sk in self.env.safekeepers:
patch_script += f"UPDATE safekeepers SET http_port={sk.port.http}, port={sk.port.pg} WHERE id = '{sk.id}';\n"
patch_script_path.write_text(patch_script)
# Update the config with info about tenants and timelines
@@ -1215,6 +1221,13 @@ class NeonEnv:
storage_controller_config = storage_controller_config or {}
storage_controller_config["use_https_safekeeper_api"] = True
# TODO(diko): uncomment when timeline_safekeeper_count option is in the release branch,
# so the compat tests will not fail bacause of it presence.
# if config.num_safekeepers < 3:
# storage_controller_config = storage_controller_config or {}
# if "timeline_safekeeper_count" not in storage_controller_config:
# storage_controller_config["timeline_safekeeper_count"] = config.num_safekeepers
if storage_controller_config is not None:
cfg["storage_controller"] = storage_controller_config
@@ -2226,6 +2239,21 @@ class NeonStorageController(MetricsGetter, LogUtils):
response.raise_for_status()
log.info(f"timeline_create success: {response.json()}")
def migrate_safekeepers(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
new_sk_set: list[int],
):
response = self.request(
"POST",
f"{self.api}/v1/tenant/{tenant_id}/timeline/{timeline_id}/safekeeper_migrate",
json={"new_sk_set": new_sk_set},
headers=self.headers(TokenScope.PAGE_SERVER_API),
)
response.raise_for_status()
log.info(f"migrate_safekeepers success: {response.json()}")
def locate(self, tenant_id: TenantId) -> list[dict[str, Any]]:
"""
:return: list of {"shard_id": "", "node_id": int, "listen_pg_addr": str, "listen_pg_port": int, "listen_http_addr": str, "listen_http_port": int}

View File

@@ -695,6 +695,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
tenant_id: TenantId | TenantShardId,
timeline_id: TimelineId,
gc_horizon: int | None,
ignore_lease_deadline: bool | None = None,
) -> dict[str, Any]:
"""
Unlike most handlers, this will wait for the layers to be actually
@@ -707,7 +708,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
)
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/do_gc",
json={"gc_horizon": gc_horizon},
json={"gc_horizon": gc_horizon, "ignore_lease_deadline": ignore_lease_deadline},
)
log.info(f"Got GC request response code: {res.status_code}")
self.verbose_error(res)

View File

@@ -76,6 +76,7 @@ if TYPE_CHECKING:
# export CHECK_ONDISK_DATA_COMPATIBILITY=true
# export COMPATIBILITY_NEON_BIN=neon_previous/target/${BUILD_TYPE}
# export COMPATIBILITY_POSTGRES_DISTRIB_DIR=neon_previous/pg_install
# export COMPATIBILITY_SNAPSHOT_DIR=test_output/compatibility_snapshot_pgv${DEFAULT_PG_VERSION}
#
# # Build previous version of binaries and store them somewhere:
# rm -rf pg_install target
@@ -102,6 +103,7 @@ if TYPE_CHECKING:
# export CHECK_ONDISK_DATA_COMPATIBILITY=true
# export COMPATIBILITY_NEON_BIN=neon_previous/target/${BUILD_TYPE}
# export COMPATIBILITY_POSTGRES_DISTRIB_DIR=neon_previous/pg_install
# export COMPATIBILITY_SNAPSHOT_DIR=test_output/compatibility_snapshot_pgv${DEFAULT_PG_VERSION}
# export NEON_BIN=target/${BUILD_TYPE}
# export POSTGRES_DISTRIB_DIR=pg_install
#

View File

@@ -201,11 +201,13 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
for shard, ps in tenant_get_shards(env, env.initial_tenant):
client = ps.http_client()
layers_guarded_before_gc = get_layers_protected_by_lease(
client, shard, env.initial_timeline, lease_lsn=lsn
client, shard, env.initial_timeline, lease_lsn=lease_lsn
)
gc_result = client.timeline_gc(
shard, env.initial_timeline, 0, ignore_lease_deadline=False
)
gc_result = client.timeline_gc(shard, env.initial_timeline, 0)
layers_guarded_after_gc = get_layers_protected_by_lease(
client, shard, env.initial_timeline, lease_lsn=lsn
client, shard, env.initial_timeline, lease_lsn=lease_lsn
)
# Note: cannot assert on `layers_removed` here because it could be layers

View File

@@ -0,0 +1,76 @@
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from fixtures.neon_fixtures import NeonEnvBuilder
def test_safekeeper_migration_simple(neon_env_builder: NeonEnvBuilder):
"""
Simple safekeeper migration test.
Creates 3 safekeepers. The timeline is configuret to use only one safekeeper.
1. Go through all safekeepers, migrate the timeline to it.
2. Stop the other safekeepers. Validate that the insert is successful.
3. Start the other safekeepers again and go to the next safekeeper.
4. Validate that the table contains all inserted values.
"""
neon_env_builder.num_safekeepers = 3
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": True,
"timeline_safekeeper_count": 1,
}
env = neon_env_builder.init_start()
# TODO(diko): pageserver spams with various errors during safekeeper migration.
# Fix the code so it handles the migration better.
env.pageserver.allowed_errors.extend(
[
".*Timeline .* was cancelled and cannot be used anymore.*",
".*Timeline .* has been deleted.*",
".*wal receiver task finished with an error.*",
]
)
ep = env.endpoints.create("main", tenant_id=env.initial_tenant)
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
assert mconf["new_sk_set"] is None
assert len(mconf["sk_set"]) == 1
assert mconf["generation"] == 1
ep.start(safekeeper_generation=1, safekeepers=mconf["sk_set"])
ep.safe_psql("CREATE EXTENSION neon_test_utils;")
ep.safe_psql("CREATE TABLE t(a int)")
for active_sk in range(1, 4):
env.storage_controller.migrate_safekeepers(
env.initial_tenant, env.initial_timeline, [active_sk]
)
other_sks = [sk for sk in range(1, 4) if sk != active_sk]
for sk in other_sks:
env.safekeepers[sk - 1].stop()
ep.safe_psql(f"INSERT INTO t VALUES ({active_sk})")
for sk in other_sks:
env.safekeepers[sk - 1].start()
ep.clear_buffers()
assert ep.safe_psql("SELECT * FROM t") == [(i,) for i in range(1, 4)]
# 1 initial generation + 2 migrations on each loop iteration.
expected_gen = 1 + 2 * 3
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
assert mconf["generation"] == expected_gen
assert ep.safe_psql("SHOW neon.safekeepers")[0][0].startswith(f"g#{expected_gen}:")
# Restart and check again to make sure data is persistent.
ep.stop()
ep.start(safekeeper_generation=1, safekeepers=[3])
assert ep.safe_psql("SELECT * FROM t") == [(i,) for i in range(1, 4)]