diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 7cdf621737..71514daa7c 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -97,7 +97,21 @@ impl ComputeControlPlane { for endpoint_dir in std::fs::read_dir(env.endpoints_path()) .with_context(|| format!("failed to list {}", env.endpoints_path().display()))? { - let ep = Endpoint::from_dir_entry(endpoint_dir?, &env)?; + let ep_res = Endpoint::from_dir_entry(endpoint_dir?, &env); + let ep = match ep_res { + Ok(ep) => ep, + Err(e) => match e.downcast::() { + Ok(e) => { + // A parallel task could delete an endpoint while we have just scanned the directory + if e.kind() == std::io::ErrorKind::NotFound { + continue; + } else { + Err(e)? + } + } + Err(e) => Err(e)?, + }, + }; endpoints.insert(ep.endpoint_id.clone(), Arc::new(ep)); } diff --git a/test_runner/regress/test_tenants.py b/test_runner/regress/test_tenants.py index 95dc0fec78..4a16535941 100644 --- a/test_runner/regress/test_tenants.py +++ b/test_runner/regress/test_tenants.py @@ -2,6 +2,7 @@ from __future__ import annotations import concurrent.futures import os +import threading import time from contextlib import closing from datetime import datetime @@ -10,7 +11,7 @@ from pathlib import Path import pytest import requests -from fixtures.common_types import Lsn, TenantId +from fixtures.common_types import Lsn, TenantId, TimelineId from fixtures.log_helper import log from fixtures.metrics import ( PAGESERVER_GLOBAL_METRICS, @@ -476,3 +477,34 @@ def test_pageserver_metrics_many_relations(neon_env_builder: NeonEnvBuilder): assert counts log.info(f"directory counts: {counts}") assert counts[2] > COUNT_AT_LEAST_EXPECTED + + +def test_timelines_parallel_endpoints(neon_simple_env: NeonEnv): + """ + (Relaxed) regression test for issue that led to https://github.com/neondatabase/neon/pull/9268 + Create many endpoints in parallel and then restart them + """ + env = neon_simple_env + + # This param needs to be 200+ to reproduce the limit issue + n_threads = 16 + barrier = threading.Barrier(n_threads) + + def test_timeline(branch_name: str, timeline_id: TimelineId): + endpoint = env.endpoints.create_start(branch_name) + endpoint.stop() + # Use a barrier to make sure we restart endpoints at the same time + barrier.wait() + endpoint.start() + + workers = [] + + for i in range(0, n_threads): + branch_name = f"branch_{i}" + timeline_id = env.create_branch(branch_name) + w = threading.Thread(target=test_timeline, args=[branch_name, timeline_id]) + workers.append(w) + w.start() + + for w in workers: + w.join()