mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-02 04:50:38 +00:00
tests: failure cases for shard splitting
This commit is contained in:
@@ -112,7 +112,7 @@ class Workload:
|
||||
else:
|
||||
return False
|
||||
|
||||
def churn_rows(self, n, pageserver_id: Optional[int] = None, upload=True):
|
||||
def churn_rows(self, n, pageserver_id: Optional[int] = None, upload=True, ingest=True):
|
||||
assert self.expect_rows >= n
|
||||
|
||||
max_iters = 10
|
||||
@@ -150,22 +150,28 @@ class Workload:
|
||||
]
|
||||
)
|
||||
|
||||
for tenant_shard_id, pageserver in tenant_get_shards(
|
||||
self.env, self.tenant_id, pageserver_id
|
||||
):
|
||||
last_flush_lsn = wait_for_last_flush_lsn(
|
||||
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
|
||||
)
|
||||
ps_http = pageserver.http_client()
|
||||
wait_for_last_record_lsn(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn)
|
||||
if ingest:
|
||||
# Wait for written data to be ingested by the pageserver
|
||||
for tenant_shard_id, pageserver in tenant_get_shards(
|
||||
self.env, self.tenant_id, pageserver_id
|
||||
):
|
||||
last_flush_lsn = wait_for_last_flush_lsn(
|
||||
self.env,
|
||||
endpoint,
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
pageserver_id=pageserver_id,
|
||||
)
|
||||
ps_http = pageserver.http_client()
|
||||
wait_for_last_record_lsn(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn)
|
||||
|
||||
if upload:
|
||||
# force a checkpoint to trigger upload
|
||||
ps_http.timeline_checkpoint(tenant_shard_id, self.timeline_id)
|
||||
wait_for_upload(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn)
|
||||
log.info(f"Churn: waiting for remote LSN {last_flush_lsn}")
|
||||
else:
|
||||
log.info(f"Churn: not waiting for upload, disk LSN {last_flush_lsn}")
|
||||
if upload:
|
||||
# Wait for written data to be uploaded to S3 (force a checkpoint to trigger upload)
|
||||
ps_http.timeline_checkpoint(tenant_shard_id, self.timeline_id)
|
||||
wait_for_upload(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn)
|
||||
log.info(f"Churn: waiting for remote LSN {last_flush_lsn}")
|
||||
else:
|
||||
log.info(f"Churn: not waiting for upload, disk LSN {last_flush_lsn}")
|
||||
|
||||
def validate(self, pageserver_id: Optional[int] = None):
|
||||
endpoint = self.endpoint(pageserver_id)
|
||||
|
||||
@@ -1,13 +1,17 @@
|
||||
import os
|
||||
from typing import Optional
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
AttachmentServiceApiException,
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
tenant_get_shards,
|
||||
)
|
||||
from fixtures.remote_storage import s3_storage
|
||||
from fixtures.types import Lsn, TenantShardId, TimelineId
|
||||
from fixtures.utils import wait_until
|
||||
from fixtures.workload import Workload
|
||||
|
||||
|
||||
@@ -400,3 +404,245 @@ def test_sharding_ingest(
|
||||
|
||||
# Each shard may emit up to one huge layer, because initdb ingest doesn't respect checkpoint_distance.
|
||||
assert huge_layer_count <= shard_count
|
||||
|
||||
|
||||
class Failure:
|
||||
pageserver_id: Optional[int]
|
||||
|
||||
def apply(self, env: NeonEnv):
|
||||
raise NotImplementedError()
|
||||
|
||||
def clear(self, env: NeonEnv):
|
||||
"""
|
||||
Clear the failure, in a way that should enable the system to proceed
|
||||
to a totally clean state (all nodes online and reconciled)
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def expect_available(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
def can_mitigate(self):
|
||||
"""Whether Self.mitigate is available for use"""
|
||||
return False
|
||||
|
||||
def mitigate(self, env: NeonEnv):
|
||||
"""
|
||||
Mitigate the failure in a way that should allow shard split to
|
||||
complete and service to resume, but does not guarantee to leave
|
||||
the whole world in a clean state (e.g. an Offline node might have
|
||||
junk LocationConfigs on it)
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def fails_forward(self):
|
||||
"""
|
||||
If true, this failure results in a state that eventualy completes the split.
|
||||
"""
|
||||
return False
|
||||
|
||||
|
||||
class PageserverFailpoint(Failure):
|
||||
def __init__(self, failpoint, pageserver_id, mitigate):
|
||||
self.failpoint = failpoint
|
||||
self.pageserver_id = pageserver_id
|
||||
self._mitigate = mitigate
|
||||
|
||||
def apply(self, env: NeonEnv):
|
||||
pageserver = env.get_pageserver(self.pageserver_id)
|
||||
pageserver.allowed_errors.extend(
|
||||
[".*failpoint.*", ".*Resetting.*after shard split failure.*"]
|
||||
)
|
||||
pageserver.http_client().configure_failpoints((self.failpoint, "return(1)"))
|
||||
|
||||
def clear(self, env: NeonEnv):
|
||||
pageserver = env.get_pageserver(self.pageserver_id)
|
||||
pageserver.http_client().configure_failpoints((self.failpoint, "off"))
|
||||
if self._mitigate:
|
||||
env.attachment_service.node_configure(self.pageserver_id, {"availability": "Active"})
|
||||
|
||||
def expect_available(self):
|
||||
return True
|
||||
|
||||
def can_mitigate(self):
|
||||
return self._mitigate
|
||||
|
||||
def mitigate(self, env):
|
||||
env.attachment_service.node_configure(self.pageserver_id, {"availability": "Offline"})
|
||||
|
||||
|
||||
class StorageControllerFailpoint(Failure):
|
||||
def __init__(self, failpoint):
|
||||
self.failpoint = failpoint
|
||||
self.pageserver_id = None
|
||||
|
||||
def apply(self, env: NeonEnv):
|
||||
env.attachment_service.configure_failpoints((self.failpoint, "return(1)"))
|
||||
|
||||
def clear(self, env: NeonEnv):
|
||||
env.attachment_service.configure_failpoints((self.failpoint, "off"))
|
||||
|
||||
def expect_available(self):
|
||||
return True
|
||||
|
||||
def can_mitigate(self):
|
||||
return False
|
||||
|
||||
def fails_forward(self):
|
||||
# Edge case: the very last failpoint that simulates a DB connection error, where
|
||||
# the abort path will fail-forward and result in a complete split.
|
||||
return self.failpoint == "shard-split-post-complete"
|
||||
|
||||
|
||||
class NodeKill(Failure):
|
||||
def __init__(self, pageserver_id, mitigate):
|
||||
self.pageserver_id = pageserver_id
|
||||
self._mitigate = mitigate
|
||||
|
||||
def apply(self, env: NeonEnv):
|
||||
pageserver = env.get_pageserver(self.pageserver_id)
|
||||
pageserver.stop(immediate=True)
|
||||
|
||||
def clear(self, env: NeonEnv):
|
||||
pageserver = env.get_pageserver(self.pageserver_id)
|
||||
pageserver.start()
|
||||
|
||||
def expect_available(self):
|
||||
return False
|
||||
|
||||
def mitigate(self, env):
|
||||
env.attachment_service.node_configure(self.pageserver_id, {"availability": "Offline"})
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"failure",
|
||||
[
|
||||
PageserverFailpoint("api-500", 1, False),
|
||||
NodeKill(1, False),
|
||||
PageserverFailpoint("api-500", 1, True),
|
||||
NodeKill(1, True),
|
||||
PageserverFailpoint("shard-split-pre-prepare", 1, False),
|
||||
PageserverFailpoint("shard-split-post-prepare", 1, False),
|
||||
PageserverFailpoint("shard-split-pre-hardlink", 1, False),
|
||||
PageserverFailpoint("shard-split-post-hardlink", 1, False),
|
||||
PageserverFailpoint("shard-split-post-child-conf", 1, False),
|
||||
PageserverFailpoint("shard-split-lsn-wait", 1, False),
|
||||
PageserverFailpoint("shard-split-pre-finish", 1, False),
|
||||
StorageControllerFailpoint("shard-split-validation"),
|
||||
StorageControllerFailpoint("shard-split-post-begin"),
|
||||
StorageControllerFailpoint("shard-split-post-remote"),
|
||||
StorageControllerFailpoint("shard-split-post-complete"),
|
||||
],
|
||||
)
|
||||
def test_sharding_split_failures(neon_env_builder: NeonEnvBuilder, failure: Failure):
|
||||
neon_env_builder.num_pageservers = 4
|
||||
initial_shard_count = 2
|
||||
split_shard_count = 4
|
||||
|
||||
env = neon_env_builder.init_start(initial_tenant_shard_count=initial_shard_count)
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
# Make sure the node we're failing has a shard on it, otherwise the test isn't testing anything
|
||||
assert (
|
||||
failure.pageserver_id is None
|
||||
or len(
|
||||
env.get_pageserver(failure.pageserver_id)
|
||||
.http_client()
|
||||
.tenant_list_locations()["tenant_shards"]
|
||||
)
|
||||
> 0
|
||||
)
|
||||
|
||||
workload = Workload(env, tenant_id, timeline_id)
|
||||
workload.init()
|
||||
workload.write_rows(100)
|
||||
|
||||
# Set one pageserver to 500 all requests, then do a split
|
||||
# TODO: also test with a long-blocking failure: controller should time out its request and then
|
||||
# clean up in a well defined way.
|
||||
failure.apply(env)
|
||||
|
||||
with pytest.raises(AttachmentServiceApiException):
|
||||
env.attachment_service.tenant_shard_split(tenant_id, shard_count=4)
|
||||
|
||||
# We expect that the overall operation will fail, but some split requests
|
||||
# will have succeeded: the net result should be to return to a clean state, including
|
||||
# detaching any child shards.
|
||||
def assert_rolled_back(exclude_ps_id=None) -> None:
|
||||
count = 0
|
||||
for ps in env.pageservers:
|
||||
if exclude_ps_id is not None and ps.id == exclude_ps_id:
|
||||
continue
|
||||
|
||||
locations = ps.http_client().tenant_list_locations()["tenant_shards"]
|
||||
for loc in locations:
|
||||
tenant_shard_id = TenantShardId.parse(loc[0])
|
||||
log.info(f"Shard {tenant_shard_id} seen on node {ps.id}")
|
||||
assert tenant_shard_id.shard_count == initial_shard_count
|
||||
count += 1
|
||||
assert count == initial_shard_count
|
||||
|
||||
def assert_split_done(exclude_ps_id=None) -> None:
|
||||
count = 0
|
||||
for ps in env.pageservers:
|
||||
if exclude_ps_id is not None and ps.id == exclude_ps_id:
|
||||
continue
|
||||
|
||||
locations = ps.http_client().tenant_list_locations()["tenant_shards"]
|
||||
for loc in locations:
|
||||
tenant_shard_id = TenantShardId.parse(loc[0])
|
||||
log.info(f"Shard {tenant_shard_id} seen on node {ps.id}")
|
||||
assert tenant_shard_id.shard_count == split_shard_count
|
||||
count += 1
|
||||
assert count == split_shard_count
|
||||
|
||||
def finish_split():
|
||||
# Having failed+rolled back, we should be able to split again
|
||||
# No failures this time; it will succeed
|
||||
env.attachment_service.tenant_shard_split(tenant_id, shard_count=split_shard_count)
|
||||
|
||||
workload.churn_rows(10)
|
||||
workload.validate()
|
||||
|
||||
if failure.expect_available():
|
||||
# Even though the split failed partway through, this should not have interrupted
|
||||
# clients. Disable waiting for pageservers in the workload helper, because our
|
||||
# failpoints may prevent API access.
|
||||
# This only applies for failure modes that leave pageserver page_service API available.
|
||||
workload.churn_rows(10, upload=False, ingest=False)
|
||||
workload.validate()
|
||||
|
||||
if failure.fails_forward():
|
||||
# A failure type which results in eventual completion of the split
|
||||
wait_until(30, 1, assert_split_done)
|
||||
elif failure.can_mitigate():
|
||||
# Mitigation phase: we expect to be able to proceed with a successful shard split
|
||||
failure.mitigate(env)
|
||||
|
||||
# The split should appear to be rolled back from the point of view of all pageservers
|
||||
# apart from the one that is offline
|
||||
wait_until(30, 1, lambda: assert_rolled_back(exclude_ps_id=failure.pageserver_id))
|
||||
|
||||
finish_split()
|
||||
wait_until(30, 1, lambda: assert_split_done(exclude_ps_id=failure.pageserver_id))
|
||||
|
||||
# Having cleared the failure, everything should converge to a pristine state
|
||||
failure.clear(env)
|
||||
wait_until(30, 1, assert_split_done)
|
||||
else:
|
||||
# Once we restore the faulty pageserver's API to good health, rollback should
|
||||
# eventually complete.
|
||||
failure.clear(env)
|
||||
|
||||
wait_until(30, 1, assert_rolled_back)
|
||||
|
||||
# Having rolled back, the tenant should be working
|
||||
workload.churn_rows(10)
|
||||
workload.validate()
|
||||
|
||||
# Splitting again should work, since we cleared the failure
|
||||
finish_split()
|
||||
assert_split_done()
|
||||
|
||||
env.attachment_service.consistency_check()
|
||||
|
||||
Reference in New Issue
Block a user