mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 14:02:55 +00:00
Merge remote-tracking branch 'origin/main' into communicator-rewrite
This commit is contained in:
@@ -5291,16 +5291,32 @@ class EndpointFactory:
|
||||
)
|
||||
|
||||
def stop_all(self, fail_on_error=True) -> Self:
|
||||
exception = None
|
||||
for ep in self.endpoints:
|
||||
"""
|
||||
Stop all the endpoints in parallel.
|
||||
"""
|
||||
|
||||
# Note: raising an exception from a task in a task group cancels
|
||||
# all the other tasks. We don't want that, hence the 'stop_one'
|
||||
# function catches exceptions and puts them on the 'exceptions'
|
||||
# list for later processing.
|
||||
exceptions = []
|
||||
|
||||
async def stop_one(ep):
|
||||
try:
|
||||
ep.stop()
|
||||
await asyncio.to_thread(ep.stop)
|
||||
except Exception as e:
|
||||
log.error(f"Failed to stop endpoint {ep.endpoint_id}: {e}")
|
||||
exception = e
|
||||
exceptions.append(e)
|
||||
|
||||
if fail_on_error and exception is not None:
|
||||
raise exception
|
||||
async def async_stop_all():
|
||||
async with asyncio.TaskGroup() as tg:
|
||||
for ep in self.endpoints:
|
||||
tg.create_task(stop_one(ep))
|
||||
|
||||
asyncio.run(async_stop_all())
|
||||
|
||||
if fail_on_error and exceptions:
|
||||
raise ExceptionGroup("stopping an endpoint failed", exceptions)
|
||||
|
||||
return self
|
||||
|
||||
|
||||
@@ -863,7 +863,6 @@ def test_pageserver_compaction_circuit_breaker(neon_env_builder: NeonEnvBuilder)
|
||||
assert not env.pageserver.log_contains(".*Circuit breaker failure ended.*")
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="Lakebase mode")
|
||||
def test_ps_corruption_detection_feedback(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Test that when the pageserver detects corruption during image layer creation,
|
||||
@@ -890,7 +889,9 @@ def test_ps_corruption_detection_feedback(neon_env_builder: NeonEnvBuilder):
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
workload = Workload(env, tenant_id, timeline_id)
|
||||
workload = Workload(
|
||||
env, tenant_id, timeline_id, endpoint_opts={"config_lines": ["neon.lakebase_mode=true"]}
|
||||
)
|
||||
workload.init()
|
||||
|
||||
# Enable the failpoint that will cause image layer creation to fail due to a (simulated) detected
|
||||
|
||||
@@ -2757,18 +2757,37 @@ def test_timeline_disk_usage_limit(neon_env_builder: NeonEnvBuilder):
|
||||
remote_storage_kind = s3_storage()
|
||||
neon_env_builder.enable_safekeeper_remote_storage(remote_storage_kind)
|
||||
|
||||
# Set a very small disk usage limit (1KB)
|
||||
neon_env_builder.safekeeper_extra_opts = ["--max-timeline-disk-usage-bytes=1024"]
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# Create a timeline and endpoint
|
||||
env.create_branch("test_timeline_disk_usage_limit")
|
||||
endpoint = env.endpoints.create_start("test_timeline_disk_usage_limit")
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_timeline_disk_usage_limit",
|
||||
config_lines=[
|
||||
"neon.lakebase_mode=true",
|
||||
],
|
||||
)
|
||||
|
||||
# Install the neon extension in the test database. We need it to query perf counter metrics.
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("CREATE EXTENSION IF NOT EXISTS neon")
|
||||
# Sanity-check safekeeper connection status in neon_perf_counters in the happy case.
|
||||
cur.execute(
|
||||
"SELECT value FROM neon_perf_counters WHERE metric = 'num_active_safekeepers'"
|
||||
)
|
||||
assert cur.fetchone() == (1,), "Expected 1 active safekeeper"
|
||||
cur.execute(
|
||||
"SELECT value FROM neon_perf_counters WHERE metric = 'num_configured_safekeepers'"
|
||||
)
|
||||
assert cur.fetchone() == (1,), "Expected 1 configured safekeeper"
|
||||
|
||||
# Get the safekeeper
|
||||
sk = env.safekeepers[0]
|
||||
|
||||
# Restart the safekeeper with a very small disk usage limit (1KB)
|
||||
sk.stop().start(["--max-timeline-disk-usage-bytes=1024"])
|
||||
|
||||
# Inject a failpoint to stop WAL backup
|
||||
with sk.http_client() as http_cli:
|
||||
http_cli.configure_failpoints([("backup-lsn-range-pausable", "pause")])
|
||||
@@ -2794,6 +2813,18 @@ def test_timeline_disk_usage_limit(neon_env_builder: NeonEnvBuilder):
|
||||
wait_until(error_logged)
|
||||
log.info("Found expected error message in compute log, resuming.")
|
||||
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# Confirm that neon_perf_counters also indicates that there are no active safekeepers
|
||||
cur.execute(
|
||||
"SELECT value FROM neon_perf_counters WHERE metric = 'num_active_safekeepers'"
|
||||
)
|
||||
assert cur.fetchone() == (0,), "Expected 0 active safekeepers"
|
||||
cur.execute(
|
||||
"SELECT value FROM neon_perf_counters WHERE metric = 'num_configured_safekeepers'"
|
||||
)
|
||||
assert cur.fetchone() == (1,), "Expected 1 configured safekeeper"
|
||||
|
||||
# Sanity check that the hanging insert is indeed still hanging. Otherwise means the circuit breaker we
|
||||
# implemented didn't work as expected.
|
||||
time.sleep(2)
|
||||
|
||||
Reference in New Issue
Block a user