diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 47b77f0720..1b231151ce 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -236,7 +236,7 @@ impl Default for NeonStorageControllerConf { heartbeat_interval: Self::DEFAULT_HEARTBEAT_INTERVAL, long_reconcile_threshold: None, use_https_pageserver_api: false, - timelines_onto_safekeepers: false, + timelines_onto_safekeepers: true, use_https_safekeeper_api: false, use_local_compute_notifications: true, } diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 8cf1020adb..050d61055e 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -489,7 +489,9 @@ class NeonEnvBuilder: self.config_init_force: str | None = None self.top_output_dir = top_output_dir self.control_plane_hooks_api: str | None = None - self.storage_controller_config: dict[Any, Any] | None = None + self.storage_controller_config: dict[Any, Any] | None = { + "timelines_onto_safekeepers": True, + } # Flag to enable https listener in pageserver, generate local ssl certs, # and force storage controller to use https for pageserver api. @@ -4909,6 +4911,9 @@ class Safekeeper(LogUtils): log.info(f"finished pulling timeline from {src_ids} to {self.id}") return res + def safekeeper_id(self) -> SafekeeperId: + return SafekeeperId(self.id, "localhost", self.port.pg_tenant_only) + @property def data_dir(self) -> Path: return self.env.repo_dir / "safekeepers" / f"sk{self.id}" diff --git a/test_runner/regress/test_branching.py b/test_runner/regress/test_branching.py index 9ce618b2ad..fa5c9aa693 100644 --- a/test_runner/regress/test_branching.py +++ b/test_runner/regress/test_branching.py @@ -11,6 +11,7 @@ from fixtures.common_types import Lsn, TimelineId from fixtures.log_helper import log from fixtures.pageserver.http import PageserverApiException from fixtures.pageserver.utils import wait_until_tenant_active +from fixtures.safekeeper.http import MembershipConfiguration, TimelineCreateRequest from fixtures.utils import query_scalar from performance.test_perf_pgbench import get_scales_matrix from requests import RequestException @@ -164,6 +165,19 @@ def test_cannot_create_endpoint_on_non_uploaded_timeline(neon_env_builder: NeonE ps_http.configure_failpoints(("before-upload-index-pausable", "pause")) env.pageserver.tenant_create(env.initial_tenant) + sk = env.safekeepers[0] + assert sk + sk.http_client().timeline_create( + TimelineCreateRequest( + env.initial_tenant, + env.initial_timeline, + MembershipConfiguration(generation=1, members=[sk.safekeeper_id()], new_members=None), + int(env.pg_version), + Lsn(0), + None, + ) + ) + initial_branch = "initial_branch" def start_creating_timeline(): diff --git a/test_runner/regress/test_normal_work.py b/test_runner/regress/test_normal_work.py index 44590ea4b9..3335cf686c 100644 --- a/test_runner/regress/test_normal_work.py +++ b/test_runner/regress/test_normal_work.py @@ -64,6 +64,11 @@ def test_normal_work( """ neon_env_builder.num_safekeepers = num_safekeepers + + if safekeeper_proto_version == 2: + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": False, + } env = neon_env_builder.init_start() pageserver_http = env.pageserver.http_client() diff --git a/test_runner/regress/test_ondemand_download.py b/test_runner/regress/test_ondemand_download.py index 2590a3fe9d..2b71662669 100644 --- a/test_runner/regress/test_ondemand_download.py +++ b/test_runner/regress/test_ondemand_download.py @@ -671,6 +671,12 @@ def test_layer_download_cancelled_by_config_location(neon_env_builder: NeonEnvBu """ neon_env_builder.enable_pageserver_remote_storage(s3_storage()) + # On the new mode, the test runs into a cancellation issue, i.e. the walproposer can't shut down + # as it is hang-waiting on the timeline_checkpoint call in WalIngest::new. + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": False, + } + # turn off background tasks so that they don't interfere with the downloads env = neon_env_builder.init_start( initial_tenant_conf={ diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index 8f3aa010e3..74ba74645e 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -88,6 +88,12 @@ def test_storage_controller_smoke( neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api env = neon_env_builder.init_configs() + # These bubble up from safekeepers + for ps in env.pageservers: + ps.allowed_errors.extend( + [".*Timeline.* has been deleted.*", ".*Timeline.*was cancelled and cannot be used"] + ) + # Start services by hand so that we can skip a pageserver (this will start + register later) env.broker.start() env.storage_controller.start() @@ -3455,7 +3461,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder): assert target.get_safekeeper(fake_id) is None - assert len(target.get_safekeepers()) == 0 + start_sks = target.get_safekeepers() sk_0 = env.safekeepers[0] @@ -3477,7 +3483,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder): inserted = target.get_safekeeper(fake_id) assert inserted is not None - assert target.get_safekeepers() == [inserted] + assert target.get_safekeepers() == start_sks + [inserted] assert eq_safekeeper_records(body, inserted) # error out if pk is changed (unexpected) @@ -3489,7 +3495,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder): assert exc.value.status_code == 400 inserted_again = target.get_safekeeper(fake_id) - assert target.get_safekeepers() == [inserted_again] + assert target.get_safekeepers() == start_sks + [inserted_again] assert inserted_again is not None assert eq_safekeeper_records(inserted, inserted_again) @@ -3498,7 +3504,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder): body["version"] += 1 target.on_safekeeper_deploy(fake_id, body) inserted_now = target.get_safekeeper(fake_id) - assert target.get_safekeepers() == [inserted_now] + assert target.get_safekeepers() == start_sks + [inserted_now] assert inserted_now is not None assert eq_safekeeper_records(body, inserted_now) @@ -3507,7 +3513,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder): body["https_port"] = 123 target.on_safekeeper_deploy(fake_id, body) inserted_now = target.get_safekeeper(fake_id) - assert target.get_safekeepers() == [inserted_now] + assert target.get_safekeepers() == start_sks + [inserted_now] assert inserted_now is not None assert eq_safekeeper_records(body, inserted_now) env.storage_controller.consistency_check() @@ -3516,7 +3522,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder): body["https_port"] = None target.on_safekeeper_deploy(fake_id, body) inserted_now = target.get_safekeeper(fake_id) - assert target.get_safekeepers() == [inserted_now] + assert target.get_safekeepers() == start_sks + [inserted_now] assert inserted_now is not None assert eq_safekeeper_records(body, inserted_now) env.storage_controller.consistency_check() @@ -3635,6 +3641,9 @@ def test_timeline_delete_mid_live_migration(neon_env_builder: NeonEnvBuilder, mi env = neon_env_builder.init_configs() env.start() + for ps in env.pageservers: + ps.allowed_errors.append(".*Timeline.* has been deleted.*") + tenant_id = TenantId.generate() timeline_id = TimelineId.generate() env.storage_controller.tenant_create(tenant_id, placement_policy={"Attached": 1}) diff --git a/test_runner/regress/test_storage_scrubber.py b/test_runner/regress/test_storage_scrubber.py index 03cd133ccb..e29cb801d5 100644 --- a/test_runner/regress/test_storage_scrubber.py +++ b/test_runner/regress/test_storage_scrubber.py @@ -341,6 +341,11 @@ def test_scrubber_physical_gc_timeline_deletion(neon_env_builder: NeonEnvBuilder env = neon_env_builder.init_configs() env.start() + for ps in env.pageservers: + ps.allowed_errors.extend( + [".*Timeline.* has been deleted.*", ".*Timeline.*was cancelled and cannot be used"] + ) + tenant_id = TenantId.generate() timeline_id = TimelineId.generate() env.create_tenant( diff --git a/test_runner/regress/test_timeline_detach_ancestor.py b/test_runner/regress/test_timeline_detach_ancestor.py index f0810270b1..c58f78aeb1 100644 --- a/test_runner/regress/test_timeline_detach_ancestor.py +++ b/test_runner/regress/test_timeline_detach_ancestor.py @@ -21,7 +21,10 @@ from fixtures.neon_fixtures import ( last_flush_lsn_upload, wait_for_last_flush_lsn, ) -from fixtures.pageserver.http import HistoricLayerInfo, PageserverApiException +from fixtures.pageserver.http import ( + HistoricLayerInfo, + PageserverApiException, +) from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_timeline_detail_404 from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind from fixtures.utils import assert_pageserver_backups_equal, skip_in_debug_build, wait_until @@ -413,6 +416,7 @@ def test_ancestor_detach_behavior_v2(neon_env_builder: NeonEnvBuilder, snapshots "read_only": True, }, ) + sk = env.safekeepers[0] assert sk with pytest.raises(requests.exceptions.HTTPError, match="Not Found"): @@ -504,8 +508,15 @@ def test_ancestor_detach_behavior_v2(neon_env_builder: NeonEnvBuilder, snapshots assert len(lineage.get("original_ancestor", [])) == 0 assert len(lineage.get("reparenting_history", [])) == 0 - for name, _, _, rows, starts in expected_result: - with env.endpoints.create_start(name, tenant_id=env.initial_tenant) as ep: + for branch_name, queried_timeline, _, rows, starts in expected_result: + details = client.timeline_detail(env.initial_tenant, queried_timeline) + log.info(f"reading data from branch {branch_name}") + # specifying the lsn makes the endpoint read-only and not connect to safekeepers + with env.endpoints.create( + branch_name, + lsn=Lsn(details["last_record_lsn"]), + ) as ep: + ep.start(safekeeper_generation=1) assert ep.safe_psql("SELECT count(*) FROM foo;")[0][0] == rows assert ep.safe_psql(f"SELECT count(*) FROM audit WHERE starts = {starts}")[0][0] == 1 @@ -1088,6 +1099,7 @@ def test_timeline_detach_ancestor_interrupted_by_deletion( for ps in env.pageservers: ps.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS) + ps.allowed_errors.append(".*Timeline.* has been deleted.*") pageservers = dict((int(p.id), p) for p in env.pageservers) @@ -1209,6 +1221,7 @@ def test_sharded_tad_interleaved_after_partial_success(neon_env_builder: NeonEnv for ps in env.pageservers: ps.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS) + ps.allowed_errors.append(".*Timeline.* has been deleted.*") pageservers = dict((int(p.id), p) for p in env.pageservers) diff --git a/test_runner/regress/test_timeline_gc_blocking.py b/test_runner/regress/test_timeline_gc_blocking.py index 9a710f5b80..8ef64a0742 100644 --- a/test_runner/regress/test_timeline_gc_blocking.py +++ b/test_runner/regress/test_timeline_gc_blocking.py @@ -24,6 +24,8 @@ def test_gc_blocking_by_timeline(neon_env_builder: NeonEnvBuilder, sharded: bool initial_tenant_conf={"gc_period": "1s", "lsn_lease_length": "0s"}, initial_tenant_shard_count=2 if sharded else None, ) + for ps in env.pageservers: + ps.allowed_errors.append(".*Timeline.* has been deleted.*") if sharded: http = env.storage_controller.pageserver_api() diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index b9183286af..ea120c1814 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -229,7 +229,7 @@ def test_many_timelines(neon_env_builder: NeonEnvBuilder): # Test timeline_list endpoint. http_cli = env.safekeepers[0].http_client() - assert len(http_cli.timeline_list()) == 3 + assert len(http_cli.timeline_list()) == 4 # Check that dead minority doesn't prevent the commits: execute insert n_inserts @@ -740,8 +740,8 @@ def test_timeline_status(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): env = neon_env_builder.init_start() tenant_id = env.initial_tenant - timeline_id = env.create_branch("test_timeline_status") - endpoint = env.endpoints.create_start("test_timeline_status") + timeline_id = env.initial_timeline + endpoint = env.endpoints.create_start("main") wa = env.safekeepers[0] @@ -1292,6 +1292,12 @@ def test_lagging_sk(neon_env_builder: NeonEnvBuilder): # it works without compute at all. def test_peer_recovery(neon_env_builder: NeonEnvBuilder): neon_env_builder.num_safekeepers = 3 + + # timelines should be created the old way + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": False, + } + env = neon_env_builder.init_start() tenant_id = env.initial_tenant @@ -1532,6 +1538,11 @@ def test_safekeeper_without_pageserver( def test_replace_safekeeper(neon_env_builder: NeonEnvBuilder): + # timelines should be created the old way manually until we have migration support + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": False, + } + def execute_payload(endpoint: Endpoint): with closing(endpoint.connect()) as conn: with conn.cursor() as cur: @@ -1661,6 +1672,15 @@ def test_pull_timeline(neon_env_builder: NeonEnvBuilder, live_sk_change: bool): res = env.safekeepers[3].pull_timeline( [env.safekeepers[0], env.safekeepers[2]], tenant_id, timeline_id ) + sk_id_1 = env.safekeepers[0].safekeeper_id() + sk_id_3 = env.safekeepers[2].safekeeper_id() + sk_id_4 = env.safekeepers[3].safekeeper_id() + new_conf = MembershipConfiguration( + generation=2, members=[sk_id_1, sk_id_3, sk_id_4], new_members=None + ) + for i in [0, 2, 3]: + env.safekeepers[i].http_client().membership_switch(tenant_id, timeline_id, new_conf) + log.info("Finished pulling timeline") log.info(res) @@ -1705,13 +1725,15 @@ def test_pull_timeline_gc(neon_env_builder: NeonEnvBuilder): neon_env_builder.num_safekeepers = 3 neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage()) env = neon_env_builder.init_start() - tenant_id = env.initial_tenant - timeline_id = env.initial_timeline (src_sk, dst_sk) = (env.safekeepers[0], env.safekeepers[2]) + dst_sk.stop() + + [tenant_id, timeline_id] = env.create_tenant() + log.info("use only first 2 safekeepers, 3rd will be seeded") - endpoint = env.endpoints.create("main") + endpoint = env.endpoints.create("main", tenant_id=tenant_id) endpoint.active_safekeepers = [1, 2] endpoint.start() endpoint.safe_psql("create table t(key int, value text)") @@ -1723,6 +1745,7 @@ def test_pull_timeline_gc(neon_env_builder: NeonEnvBuilder): src_http = src_sk.http_client() # run pull_timeline which will halt before downloading files src_http.configure_failpoints(("sk-snapshot-after-list-pausable", "pause")) + dst_sk.start() pt_handle = PropagatingThread( target=dst_sk.pull_timeline, args=([src_sk], tenant_id, timeline_id) ) @@ -1782,23 +1805,27 @@ def test_pull_timeline_term_change(neon_env_builder: NeonEnvBuilder): neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage()) env = neon_env_builder.init_start() tenant_id = env.initial_tenant - timeline_id = env.initial_timeline (src_sk, dst_sk) = (env.safekeepers[0], env.safekeepers[2]) + dst_sk.stop() + src_http = src_sk.http_client() + src_http.configure_failpoints(("sk-snapshot-after-list-pausable", "pause")) + + timeline_id = env.create_branch("pull_timeline_term_changes") + + # run pull_timeline which will halt before downloading files log.info("use only first 2 safekeepers, 3rd will be seeded") - ep = env.endpoints.create("main") + ep = env.endpoints.create("pull_timeline_term_changes") ep.active_safekeepers = [1, 2] ep.start() ep.safe_psql("create table t(key int, value text)") ep.safe_psql("insert into t select generate_series(1, 1000), 'pear'") - src_http = src_sk.http_client() - # run pull_timeline which will halt before downloading files - src_http.configure_failpoints(("sk-snapshot-after-list-pausable", "pause")) pt_handle = PropagatingThread( target=dst_sk.pull_timeline, args=([src_sk], tenant_id, timeline_id) ) + dst_sk.start() pt_handle.start() src_sk.wait_until_paused("sk-snapshot-after-list-pausable") @@ -1807,7 +1834,7 @@ def test_pull_timeline_term_change(neon_env_builder: NeonEnvBuilder): # restart compute to bump term ep.stop() - ep = env.endpoints.create("main") + ep = env.endpoints.create("pull_timeline_term_changes") ep.active_safekeepers = [1, 2] ep.start() ep.safe_psql("insert into t select generate_series(1, 100), 'pear'") @@ -1929,6 +1956,11 @@ def test_pull_timeline_while_evicted(neon_env_builder: NeonEnvBuilder): @run_only_on_default_postgres("tests only safekeeper API") def test_membership_api(neon_env_builder: NeonEnvBuilder): neon_env_builder.num_safekeepers = 1 + # timelines should be created the old way + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": False, + } + env = neon_env_builder.init_start() # These are expected after timeline deletion on safekeepers. @@ -2009,6 +2041,12 @@ def test_explicit_timeline_creation(neon_env_builder: NeonEnvBuilder): created manually, later storcon will do that. """ neon_env_builder.num_safekeepers = 3 + + # timelines should be created the old way manually + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": False, + } + env = neon_env_builder.init_start() tenant_id = env.initial_tenant @@ -2064,7 +2102,7 @@ def test_idle_reconnections(neon_env_builder: NeonEnvBuilder): env = neon_env_builder.init_start() tenant_id = env.initial_tenant - timeline_id = env.create_branch("test_idle_reconnections") + timeline_id = env.initial_timeline def collect_stats() -> dict[str, float]: # we need to collect safekeeper_pg_queries_received_total metric from all safekeepers @@ -2095,7 +2133,7 @@ def test_idle_reconnections(neon_env_builder: NeonEnvBuilder): collect_stats() - endpoint = env.endpoints.create_start("test_idle_reconnections") + endpoint = env.endpoints.create_start("main") # just write something to the timeline endpoint.safe_psql("create table t(i int)") collect_stats() diff --git a/test_runner/regress/test_wal_acceptor_async.py b/test_runner/regress/test_wal_acceptor_async.py index d8a7dc2a2b..1bad387a90 100644 --- a/test_runner/regress/test_wal_acceptor_async.py +++ b/test_runner/regress/test_wal_acceptor_async.py @@ -590,6 +590,13 @@ async def run_wal_truncation(env: NeonEnv, safekeeper_proto_version: int): @pytest.mark.parametrize("safekeeper_proto_version", [2, 3]) def test_wal_truncation(neon_env_builder: NeonEnvBuilder, safekeeper_proto_version: int): neon_env_builder.num_safekeepers = 3 + if safekeeper_proto_version == 2: + # On the legacy protocol, we don't support generations, which are part of + # `timelines_onto_safekeepers` + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": False, + } + env = neon_env_builder.init_start() asyncio.run(run_wal_truncation(env, safekeeper_proto_version)) @@ -713,6 +720,11 @@ async def run_quorum_sanity(env: NeonEnv): # we don't. def test_quorum_sanity(neon_env_builder: NeonEnvBuilder): neon_env_builder.num_safekeepers = 4 + + # The test fails basically always on the new mode. + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": False, + } env = neon_env_builder.init_start() asyncio.run(run_quorum_sanity(env)) diff --git a/test_runner/regress/test_wal_receiver.py b/test_runner/regress/test_wal_receiver.py index 0252b590cc..d281c055b0 100644 --- a/test_runner/regress/test_wal_receiver.py +++ b/test_runner/regress/test_wal_receiver.py @@ -16,6 +16,13 @@ if TYPE_CHECKING: # Checks that pageserver's walreceiver state is printed in the logs during WAL wait timeout. # Ensures that walreceiver does not run without any data inserted and only starts after the insertion. def test_pageserver_lsn_wait_error_start(neon_env_builder: NeonEnvBuilder): + # we assert below that the walreceiver is not active before data writes. + # with manually created timelines, it is active. + # FIXME: remove this test once we remove timelines_onto_safekeepers + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": False, + } + # Trigger WAL wait timeout faster neon_env_builder.pageserver_config_override = "wait_lsn_timeout = '1s'" env = neon_env_builder.init_start()