From 76088c16d26dacec1086a4e2ba7359678c17588e Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 21 Mar 2025 08:48:56 +0000 Subject: [PATCH] storcon: reproduce shard split issue (#11290) ## Problem Issue https://github.com/neondatabase/neon/issues/11254 describes a case where restart during a shard split can result in a bad end state in the database. ## Summary of changes - Add a reproducer for the issue - Tighten an existing safety check around updated row counts in complete_shard_split --- storage_controller/src/persistence.rs | 3 +- storage_controller/src/service.rs | 2 + test_runner/fixtures/neon_fixtures.py | 16 +- .../regress/test_storage_controller.py | 137 ++++++++++++++++++ 4 files changed, 155 insertions(+), 3 deletions(-) diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index 0e6f80a060..c927b7c366 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -997,10 +997,11 @@ impl Persistence { // Clear sharding flag let updated = diesel::update(tenant_shards) .filter(tenant_id.eq(split_tenant_id.to_string())) + .filter(shard_count.eq(new_shard_count.literal() as i32)) .set((splitting.eq(0),)) .execute(conn) .await?; - debug_assert!(updated > 0); + assert!(updated == new_shard_count.count() as usize); Ok(()) }) diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index a557187879..40915bd753 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -5456,6 +5456,8 @@ impl Service { } } + pausable_failpoint!("shard-split-pre-complete"); + // TODO: if the pageserver restarted concurrently with our split API call, // the actual generation of the child shard might differ from the generation // we expect it to have. In order for our in-database generation to end up diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index ce14ef6f3b..b6b00e6f7e 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1725,6 +1725,8 @@ class LogUtils: log.warning(f"Skipping log check: {logfile} does not exist") return None + log.info(f"Checking log {logfile} for pattern '{pattern}'") + contains_re = re.compile(pattern) # XXX: Our rust logging machinery buffers the messages, so if you @@ -2618,10 +2620,13 @@ class NeonProxiedStorageController(NeonStorageController): self.running = False return self + def instance_log_path(self, instance_id: int) -> Path: + return self.env.repo_dir / f"storage_controller_{instance_id}" / "storage_controller.log" + def assert_no_errors(self): for instance_id in self.instances.keys(): assert_no_errors( - self.env.repo_dir / f"storage_controller_{instance_id}" / "storage_controller.log", + self.instance_log_path(instance_id), "storage_controller", self.allowed_errors, ) @@ -2629,7 +2634,14 @@ class NeonProxiedStorageController(NeonStorageController): def log_contains( self, pattern: str, offset: None | LogCursor = None ) -> tuple[str, LogCursor] | None: - raise NotImplementedError() + for instance_id in self.instances.keys(): + log_path = self.instance_log_path(instance_id) + checker = LogUtils(log_path) + found = checker.log_contains(pattern, offset) + if found is not None: + return found + + return None @dataclass diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index 3822ac8e88..c37859bba9 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -2863,6 +2863,143 @@ def test_storage_controller_leadership_transfer( ) +def test_storage_controller_leadership_transfer_during_split( + neon_env_builder: NeonEnvBuilder, + storage_controller_proxy: StorageControllerProxy, + port_distributor: PortDistributor, +): + """ + Exercise a race between shard splitting and graceful leadership transfer. This is + a reproducer for https://github.com/neondatabase/neon/issues/11254 + """ + neon_env_builder.auth_enabled = True + + neon_env_builder.num_pageservers = 3 + + neon_env_builder.storage_controller_config = { + "database_url": f"127.0.0.1:{port_distributor.get_port()}", + "start_as_candidate": True, + } + + neon_env_builder.storage_controller_port_override = storage_controller_proxy.port() + + storage_controller_1_port = port_distributor.get_port() + storage_controller_2_port = port_distributor.get_port() + + storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_1_port}") + + env = neon_env_builder.init_configs() + start_env(env, storage_controller_1_port) + + assert ( + env.storage_controller.get_leadership_status() == StorageControllerLeadershipStatus.LEADER + ) + leader = env.storage_controller.get_leader() + assert leader["address"] == f"http://127.0.0.1:{storage_controller_1_port}/" + + tenant_count = 2 + shard_count = 4 + tenants = set(TenantId.generate() for _ in range(0, tenant_count)) + + for tid in tenants: + env.storage_controller.tenant_create( + tid, shard_count=shard_count, placement_policy={"Attached": 1} + ) + env.storage_controller.reconcile_until_idle() + + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + # Start a shard split + env.storage_controller.allowed_errors.extend( + [".*Unexpected child shard count.*", ".*Enqueuing background abort.*"] + ) + pause_failpoint = "shard-split-pre-complete" + env.storage_controller.configure_failpoints((pause_failpoint, "pause")) + split_fut = executor.submit( + env.storage_controller.tenant_shard_split, list(tenants)[0], shard_count * 2 + ) + + def hit_failpoint(): + log.info("Checking log for pattern...") + try: + assert env.storage_controller.log_contains(f".*at failpoint {pause_failpoint}.*") + except Exception: + log.exception("Failed to find pattern in log") + raise + + wait_until(hit_failpoint, interval=0.1, status_interval=1.0) + + env.storage_controller.start( + timeout_in_seconds=30, instance_id=2, base_port=storage_controller_2_port + ) + + def passed_split_abort(): + try: + log.info("Checking log for pattern...") + assert env.storage_controller.log_contains( + ".*Using observed state received from leader.*" + ) + except Exception: + log.exception("Failed to find pattern in log") + raise + + log.info("Awaiting split abort") + wait_until(passed_split_abort, interval=0.1, status_interval=1.0) + assert env.storage_controller.log_contains(".*Aborting shard split.*") + + # Proxy is still talking to original controller here: disable its pause failpoint so + # that its shard split can run to completion. + log.info("Disabling failpoint") + # Bypass the proxy: the python test HTTPServer is single threaded and still blocked + # on handling the shard split request. + env.storage_controller.request( + "PUT", + f"http://127.0.0.1:{storage_controller_1_port}/debug/v1/failpoints", + json=[{"name": "shard-split-pre-complete", "actions": "off"}], + headers=env.storage_controller.headers(TokenScope.ADMIN), + ) + + def previous_stepped_down(): + assert ( + env.storage_controller.get_leadership_status() + == StorageControllerLeadershipStatus.STEPPED_DOWN + ) + + log.info("Awaiting step down") + wait_until(previous_stepped_down) + + # Let the shard split complete: this may happen _after_ the replacement has come up + # and tried to clean up the databases + log.info("Unblocking & awaiting shard split") + with pytest.raises(Exception, match="Unexpected child shard count"): + # This split fails when it tries to persist results, because it encounters + # changes already made by the new controller's abort-on-startup + split_fut.result() + + log.info("Routing to new leader") + storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_2_port}") + + def new_becomes_leader(): + assert ( + env.storage_controller.get_leadership_status() + == StorageControllerLeadershipStatus.LEADER + ) + + wait_until(new_becomes_leader) + leader = env.storage_controller.get_leader() + assert leader["address"] == f"http://127.0.0.1:{storage_controller_2_port}/" + + env.storage_controller.wait_until_ready() + env.storage_controller.consistency_check() + + # Check that the stepped down instance forwards requests + # to the new leader while it's still running. + storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_1_port}") + env.storage_controller.tenant_shard_dump() + env.storage_controller.node_configure(env.pageservers[0].id, {"scheduling": "Pause"}) + status = env.storage_controller.node_status(env.pageservers[0].id) + assert status["scheduling"] == "Pause" + + def test_storage_controller_ps_restarted_during_drain(neon_env_builder: NeonEnvBuilder): # single unsharded tenant, two locations neon_env_builder.num_pageservers = 2