Add test for fixed storage broker issue (#9311)

Adds a test for the (now fixed) storage broker limit issue, see #9268
for the description and #9299 for the fix.

Also fix a race condition with endpoint creation/starts running in parallel,
leading to file not found errors.
This commit is contained in:
Arpad Müller
2024-10-14 14:34:57 +02:00
committed by GitHub
parent 31b7703fa8
commit d92ff578c4
2 changed files with 48 additions and 2 deletions

View File

@@ -97,7 +97,21 @@ impl ComputeControlPlane {
for endpoint_dir in std::fs::read_dir(env.endpoints_path())
.with_context(|| format!("failed to list {}", env.endpoints_path().display()))?
{
let ep = Endpoint::from_dir_entry(endpoint_dir?, &env)?;
let ep_res = Endpoint::from_dir_entry(endpoint_dir?, &env);
let ep = match ep_res {
Ok(ep) => ep,
Err(e) => match e.downcast::<std::io::Error>() {
Ok(e) => {
// A parallel task could delete an endpoint while we have just scanned the directory
if e.kind() == std::io::ErrorKind::NotFound {
continue;
} else {
Err(e)?
}
}
Err(e) => Err(e)?,
},
};
endpoints.insert(ep.endpoint_id.clone(), Arc::new(ep));
}

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
import concurrent.futures
import os
import threading
import time
from contextlib import closing
from datetime import datetime
@@ -10,7 +11,7 @@ from pathlib import Path
import pytest
import requests
from fixtures.common_types import Lsn, TenantId
from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.metrics import (
PAGESERVER_GLOBAL_METRICS,
@@ -476,3 +477,34 @@ def test_pageserver_metrics_many_relations(neon_env_builder: NeonEnvBuilder):
assert counts
log.info(f"directory counts: {counts}")
assert counts[2] > COUNT_AT_LEAST_EXPECTED
def test_timelines_parallel_endpoints(neon_simple_env: NeonEnv):
"""
(Relaxed) regression test for issue that led to https://github.com/neondatabase/neon/pull/9268
Create many endpoints in parallel and then restart them
"""
env = neon_simple_env
# This param needs to be 200+ to reproduce the limit issue
n_threads = 16
barrier = threading.Barrier(n_threads)
def test_timeline(branch_name: str, timeline_id: TimelineId):
endpoint = env.endpoints.create_start(branch_name)
endpoint.stop()
# Use a barrier to make sure we restart endpoints at the same time
barrier.wait()
endpoint.start()
workers = []
for i in range(0, n_threads):
branch_name = f"branch_{i}"
timeline_id = env.create_branch(branch_name)
w = threading.Thread(target=test_timeline, args=[branch_name, timeline_id])
workers.append(w)
w.start()
for w in workers:
w.join()