storage controller: test for large shard counts (#7475)

## Problem

Storage controller was observed to have unexpectedly large memory
consumption when loaded with many thousands of shards.

This was recently fixed:
- https://github.com/neondatabase/neon/pull/7493

...but we need a general test that the controller is well behaved with
thousands of shards.

Closes: https://github.com/neondatabase/neon/issues/7460
Closes: https://github.com/neondatabase/neon/issues/7463

## Summary of changes

- Add test test_storage_controller_many_tenants to exercise the system's
behaviour with a more substantial workload. This test measures memory
consumption and reproduces #7460 before the other changes in this PR.
- Tweak reconcile_all's return value to make it nonzero if it spawns no
reconcilers, but _would_ have spawned some reconcilers if they weren't
blocked by the reconcile concurrency limit. This makes the test's
reconcile_until_idle behave as expected (i.e. not complete until the
system is nice and calm).
- Fix an issue where tenant migrations would leave a spurious secondary
location when migrated to some location that was not already their
secondary (this was an existing low-impact bug that tripped up the
test's consistency checks).

On the test with 8000 shards, the resident memory per shard is about
20KiB. This is not really per-shard memory: the primary source of memory
growth is the number of concurrent network/db clients we create.

With 8000 shards, the test takes 125s to run on my workstation.
This commit is contained in:
John Spray
2024-04-30 16:21:54 +01:00
committed by GitHub
parent 3a2f10712a
commit a74b60066c
10 changed files with 292 additions and 21 deletions

1
Cargo.lock generated
View File

@@ -1319,6 +1319,7 @@ dependencies = [
"git-version",
"hex",
"humantime",
"humantime-serde",
"hyper 0.14.26",
"nix 0.27.1",
"once_cell",

View File

@@ -17,6 +17,7 @@ nix.workspace = true
once_cell.workspace = true
postgres.workspace = true
hex.workspace = true
humantime-serde.workspace = true
hyper.workspace = true
regex.workspace = true
reqwest = { workspace = true, features = ["blocking", "json"] }

View File

@@ -1554,8 +1554,8 @@ fn cli() -> Command {
Command::new("storage_controller")
.arg_required_else_help(true)
.about("Manage storage_controller")
.subcommand(Command::new("start").about("Start local pageserver").arg(pageserver_config_args.clone()))
.subcommand(Command::new("stop").about("Stop local pageserver")
.subcommand(Command::new("start").about("Start storage controller"))
.subcommand(Command::new("stop").about("Stop storage controller")
.arg(stop_mode_arg.clone()))
)
.subcommand(

View File

@@ -17,6 +17,7 @@ use std::net::Ipv4Addr;
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use std::time::Duration;
use utils::{
auth::{encode_from_key_file, Claims},
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
@@ -66,6 +67,10 @@ pub struct LocalEnv {
pub broker: NeonBroker,
// Configuration for the storage controller (1 per neon_local environment)
#[serde(default)]
pub storage_controller: NeonStorageControllerConf,
/// This Vec must always contain at least one pageserver
pub pageservers: Vec<PageServerConf>,
@@ -98,6 +103,29 @@ pub struct NeonBroker {
pub listen_addr: SocketAddr,
}
/// Broker config for cluster internal communication.
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
#[serde(default)]
pub struct NeonStorageControllerConf {
/// Heartbeat timeout before marking a node offline
#[serde(with = "humantime_serde")]
pub max_unavailable: Duration,
}
impl NeonStorageControllerConf {
// Use a shorter pageserver unavailability interval than the default to speed up tests.
const DEFAULT_MAX_UNAVAILABLE_INTERVAL: std::time::Duration =
std::time::Duration::from_secs(10);
}
impl Default for NeonStorageControllerConf {
fn default() -> Self {
Self {
max_unavailable: Self::DEFAULT_MAX_UNAVAILABLE_INTERVAL,
}
}
}
// Dummy Default impl to satisfy Deserialize derive.
impl Default for NeonBroker {
fn default() -> Self {

View File

@@ -1,4 +1,7 @@
use crate::{background_process, local_env::LocalEnv};
use crate::{
background_process,
local_env::{LocalEnv, NeonStorageControllerConf},
};
use camino::{Utf8Path, Utf8PathBuf};
use hyper::Method;
use pageserver_api::{
@@ -32,15 +35,13 @@ pub struct StorageController {
public_key: Option<String>,
postgres_port: u16,
client: reqwest::Client,
config: NeonStorageControllerConf,
}
const COMMAND: &str = "storage_controller";
const STORAGE_CONTROLLER_POSTGRES_VERSION: u32 = 16;
// Use a shorter pageserver unavailability interval than the default to speed up tests.
const NEON_LOCAL_MAX_UNAVAILABLE_INTERVAL: std::time::Duration = std::time::Duration::from_secs(10);
#[derive(Serialize, Deserialize)]
pub struct AttachHookRequest {
pub tenant_shard_id: TenantShardId,
@@ -135,6 +136,7 @@ impl StorageController {
client: reqwest::ClientBuilder::new()
.build()
.expect("Failed to construct http client"),
config: env.storage_controller.clone(),
}
}
@@ -272,8 +274,6 @@ impl StorageController {
// Run migrations on every startup, in case something changed.
let database_url = self.setup_database().await?;
let max_unavailable: humantime::Duration = NEON_LOCAL_MAX_UNAVAILABLE_INTERVAL.into();
let mut args = vec![
"-l",
&self.listen,
@@ -283,7 +283,7 @@ impl StorageController {
"--database-url",
&database_url,
"--max-unavailable-interval",
&max_unavailable.to_string(),
&humantime::Duration::from(self.config.max_unavailable).to_string(),
]
.into_iter()
.map(|s| s.to_string())

View File

@@ -90,7 +90,11 @@ const INITIAL_GENERATION: Generation = Generation::new(0);
/// up on unresponsive pageservers and proceed.
pub(crate) const STARTUP_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
pub const MAX_UNAVAILABLE_INTERVAL_DEFAULT: Duration = Duration::from_secs(30);
/// How long a node may be unresponsive to heartbeats before we declare it offline.
/// This must be long enough to cover node restarts as well as normal operations: in future
/// it should be separated into distinct timeouts for startup vs. normal operation
/// (`<https://github.com/neondatabase/neon/issues/7552>`)
pub const MAX_UNAVAILABLE_INTERVAL_DEFAULT: Duration = Duration::from_secs(300);
pub const RECONCILER_CONCURRENCY_DEFAULT: usize = 128;
@@ -4251,7 +4255,9 @@ impl Service {
/// Check all tenants for pending reconciliation work, and reconcile those in need.
/// Additionally, reschedule tenants that require it.
///
/// Returns how many reconciliation tasks were started
/// Returns how many reconciliation tasks were started, or `1` if no reconciles were
/// spawned but some _would_ have been spawned if `reconciler_concurrency` units where
/// available. A return value of 0 indicates that everything is fully reconciled already.
fn reconcile_all(&self) -> usize {
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, _scheduler) = locked.parts_mut();
@@ -4266,7 +4272,11 @@ impl Service {
}
// Skip checking if this shard is already enqueued for reconciliation
if shard.delayed_reconcile {
if shard.delayed_reconcile && self.reconciler_concurrency.available_permits() == 0 {
// If there is something delayed, then return a nonzero count so that
// callers like reconcile_all_now do not incorrectly get the impression
// that the system is in a quiescent state.
reconciles_spawned = std::cmp::max(1, reconciles_spawned);
continue;
}
@@ -4451,7 +4461,7 @@ impl Service {
waiter_count
);
Ok(waiter_count)
Ok(std::cmp::max(waiter_count, reconciles_spawned))
}
pub async fn shutdown(&self) {

View File

@@ -952,8 +952,8 @@ impl TenantShard {
/// Create a waiter that will wait for some future Reconciler that hasn't been spawned yet.
///
/// This is appropriate when you can't spawn a recociler (e.g. due to resource limits), but
/// you would like to wait until one gets spawned in the background.
/// This is appropriate when you can't spawn a reconciler (e.g. due to resource limits), but
/// you would like to wait on the next reconciler that gets spawned in the background.
pub(crate) fn future_reconcile_waiter(&mut self) -> ReconcilerWaiter {
self.ensure_sequence_ahead();

View File

@@ -14,10 +14,18 @@ class ComputeReconfigure:
self.server = server
self.control_plane_compute_hook_api = f"http://{server.host}:{server.port}/notify-attach"
self.workloads = {}
self.on_notify = None
def register_workload(self, workload):
self.workloads[workload.tenant_id] = workload
def register_on_notify(self, fn):
"""
Add some extra work during a notification, like sleeping to slow things down, or
logging what was notified.
"""
self.on_notify = fn
@pytest.fixture(scope="function")
def compute_reconfigure_listener(make_httpserver):
@@ -43,6 +51,9 @@ def compute_reconfigure_listener(make_httpserver):
body: dict[str, Any] = request.json
log.info(f"notify-attach request: {body}")
if self.on_notify is not None:
self.on_notify(body)
try:
workload = self.workloads[TenantId(body["tenant_id"])]
except KeyError:

View File

@@ -499,6 +499,7 @@ class NeonEnvBuilder:
self.config_init_force: Optional[str] = None
self.top_output_dir = top_output_dir
self.control_plane_compute_hook_api: Optional[str] = None
self.storage_controller_config: Optional[dict[Any, Any]] = None
self.pageserver_virtual_file_io_engine: Optional[str] = pageserver_virtual_file_io_engine
@@ -1021,6 +1022,7 @@ class NeonEnv:
self.pg_distrib_dir = config.pg_distrib_dir
self.endpoint_counter = 0
self.pageserver_config_override = config.pageserver_config_override
self.storage_controller_config = config.storage_controller_config
# generate initial tenant ID here instead of letting 'neon init' generate it,
# so that we don't need to dig it out of the config file afterwards.
@@ -1066,6 +1068,9 @@ class NeonEnv:
if self.control_plane_compute_hook_api is not None:
cfg["control_plane_compute_hook_api"] = self.control_plane_compute_hook_api
if self.storage_controller_config is not None:
cfg["storage_controller"] = self.storage_controller_config
# Create config for pageserver
http_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
pg_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
@@ -1134,12 +1139,9 @@ class NeonEnv:
# bounce through retries on startup
self.storage_controller.start()
def storage_controller_ready():
assert self.storage_controller.ready() is True
# Wait for storage controller readiness to prevent unnecessary post start-up
# reconcile.
wait_until(30, 1, storage_controller_ready)
self.storage_controller.wait_until_ready()
# Start up broker, pageserver and all safekeepers
futs = []
@@ -2043,6 +2045,15 @@ class NeonStorageController(MetricsGetter):
else:
raise RuntimeError(f"Unexpected status {status} from readiness endpoint")
def wait_until_ready(self):
t1 = time.time()
def storage_controller_ready():
assert self.ready() is True
wait_until(30, 1, storage_controller_ready)
return time.time() - t1
def attach_hook_issue(
self, tenant_shard_id: Union[TenantId, TenantShardId], pageserver_id: int
) -> int:
@@ -2130,7 +2141,7 @@ class NeonStorageController(MetricsGetter):
shard_count: Optional[int] = None,
shard_stripe_size: Optional[int] = None,
tenant_config: Optional[Dict[Any, Any]] = None,
placement_policy: Optional[str] = None,
placement_policy: Optional[Union[Dict[Any, Any] | str]] = None,
):
"""
Use this rather than pageserver_api() when you need to include shard parameters
@@ -2240,10 +2251,21 @@ class NeonStorageController(MetricsGetter):
def reconcile_until_idle(self, timeout_secs=30):
start_at = time.time()
n = 1
delay_sec = 0.5
delay_max = 5
while n > 0:
n = self.reconcile_all()
if time.time() - start_at > timeout_secs:
if n == 0:
break
elif time.time() - start_at > timeout_secs:
raise RuntimeError("Timeout in reconcile_until_idle")
else:
# Don't call again right away: if we're waiting for many reconciles that
# are blocked on the concurrency limit, it slows things down to call
# reconcile_all frequently.
time.sleep(delay_sec)
delay_sec *= 2
delay_sec = min(delay_sec, delay_max)
def consistency_check(self):
"""

View File

@@ -0,0 +1,198 @@
import concurrent.futures
import random
import time
import pytest
from fixtures.compute_reconfigure import ComputeReconfigure
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pg_version import PgVersion
from fixtures.types import TenantId, TenantShardId, TimelineId
@pytest.mark.timeout(3600) # super long running test: should go down as we optimize
def test_storage_controller_many_tenants(
neon_env_builder: NeonEnvBuilder, compute_reconfigure_listener: ComputeReconfigure
):
"""
Check that we cope well with a not-totally-trivial number of tenants.
This is checking for:
- Obvious concurrency bugs from issuing many tenant creations/modifications
concurrently.
- Obvious scaling bugs like O(N^2) scaling that would be so slow that even
a basic test starts failing from slowness.
This is _not_ a comprehensive scale test: just a basic sanity check that
we don't fall over for a thousand shards.
"""
neon_env_builder.num_pageservers = 5
neon_env_builder.storage_controller_config = {
# Default neon_local uses a small timeout: use a longer one to tolerate longer pageserver restarts.
# TODO: tune this down as restarts get faster (https://github.com/neondatabase/neon/pull/7553), to
# guard against regressions in restart time.
"max_unavailable": "300s"
}
neon_env_builder.control_plane_compute_hook_api = (
compute_reconfigure_listener.control_plane_compute_hook_api
)
# A small sleep on each call into the notify hook, to simulate the latency of doing a database write
compute_reconfigure_listener.register_on_notify(lambda body: time.sleep(0.01))
env = neon_env_builder.init_start()
# We will intentionally stress reconciler concurrrency, which triggers a warning when lots
# of shards are hitting the delayed path.
env.storage_controller.allowed_errors.append(".*Many shards are waiting to reconcile")
for ps in env.pageservers:
# This can happen because when we do a loop over all pageservers and mark them offline/active,
# reconcilers might get cancelled, and the next reconcile can follow a not-so-elegant path of
# bumping generation before other attachments are detached.
#
# We could clean this up by making reconcilers respect the .observed of their predecessor, if
# we spawn with a wait for the predecessor.
ps.allowed_errors.append(".*Dropped remote consistent LSN updates.*")
# Storage controller is allowed to drop pageserver requests when the cancellation token
# for a Reconciler fires.
ps.allowed_errors.append(".*request was dropped before completing.*")
# Total tenants
tenant_count = 4000
# Shards per tenant
shard_count = 2
stripe_size = 1024
tenants = set(TenantId.generate() for _i in range(0, tenant_count))
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
def check_memory():
# Shards should be cheap_ in memory, as we will have very many of them
expect_memory_per_shard = 128 * 1024
rss = env.storage_controller.get_metric_value("process_resident_memory_bytes")
assert rss is not None
log.info(f"Resident memory: {rss} ({ rss / (shard_count * tenant_count)} per shard)")
assert rss < expect_memory_per_shard * shard_count * tenant_count
# We use a fixed seed to make the test somewhat reproducible: we want a randomly
# chosen order in the sense that it's arbitrary, but not in the sense that it should change every run.
rng = random.Random(1234)
# Issue more concurrent operations than the storage controller's reconciler concurrency semaphore
# permits, to ensure that we are exercising stressing that.
api_concurrency = 135
# We will create tenants directly via API, not via neon_local, to avoid any false
# serialization of operations in neon_local (it e.g. loads/saves a config file on each call)
with concurrent.futures.ThreadPoolExecutor(max_workers=api_concurrency) as executor:
futs = []
t1 = time.time()
for tenant_id in tenants:
f = executor.submit(
env.storage_controller.tenant_create,
tenant_id,
shard_count,
stripe_size,
placement_policy={"Attached": 1},
)
futs.append(f)
# Wait for creations to finish
for f in futs:
f.result()
log.info(
f"Created {len(tenants)} tenants in {time.time() - t1}, {len(tenants) / (time.time() - t1)}/s"
)
run_ops = api_concurrency * 4
assert run_ops < len(tenants)
op_tenants = list(tenants)[0:run_ops]
# Generate a mixture of operations and dispatch them all concurrently
futs = []
for tenant_id in op_tenants:
op = rng.choice([0, 1, 2])
if op == 0:
# A fan-out write operation to all shards in a tenant (timeline creation)
f = executor.submit(
virtual_ps_http.timeline_create,
PgVersion.NOT_SET,
tenant_id,
TimelineId.generate(),
)
elif op == 1:
# A reconciler operation: migrate a shard.
shard_number = rng.randint(0, shard_count - 1)
tenant_shard_id = TenantShardId(tenant_id, shard_number, shard_count)
dest_ps_id = rng.choice([ps.id for ps in env.pageservers])
f = executor.submit(
env.storage_controller.tenant_shard_migrate, tenant_shard_id, dest_ps_id
)
elif op == 2:
# A passthrough read to shard zero
f = executor.submit(virtual_ps_http.tenant_status, tenant_id)
futs.append(f)
# Wait for mixed ops to finish
for f in futs:
f.result()
# Consistency check is safe here: all the previous operations waited for reconcile before completing
env.storage_controller.consistency_check()
check_memory()
# This loop waits for reconcile_all to indicate no pending work, and then calls it once more to time
# how long the call takes when idle: this iterates over shards while doing no I/O and should be reliably fast: if
# it isn't, that's a sign that we have made some algorithmic mistake (e.g. O(N**2) scheduling)
#
# We do not require that the system is quiescent already here, although at present in this point in the test
# that may be the case.
while True:
t1 = time.time()
reconcilers = env.storage_controller.reconcile_all()
if reconcilers == 0:
# Time how long a no-op background reconcile takes: this measures how long it takes to
# loop over all the shards looking for work to do.
runtime = time.time() - t1
log.info(f"No-op call to reconcile_all took {runtime}s")
assert runtime < 1
break
# Restart the storage controller
env.storage_controller.stop()
env.storage_controller.start()
# See how long the controller takes to pass its readiness check. This should be fast because
# all the nodes are online: offline pageservers are the only thing that's allowed to delay
# startup.
readiness_period = env.storage_controller.wait_until_ready()
assert readiness_period < 5
# Consistency check is safe here: the storage controller's restart should not have caused any reconcilers
# to run, as it was in a stable state before restart. If it did, that's a bug.
env.storage_controller.consistency_check()
check_memory()
# Restart pageservers: this exercises the /re-attach API
for pageserver in env.pageservers:
pageserver.stop()
pageserver.start()
# Consistency check is safe here: restarting pageservers should not have caused any Reconcilers to spawn,
# as they were not offline long enough to trigger any scheduling changes.
env.storage_controller.consistency_check()
check_memory()
# Stop the storage controller before tearing down fixtures, because it otherwise might log
# errors trying to call our `ComputeReconfigure`.
env.storage_controller.stop()