storcon: add use_local_compute_notifications flag (#11333)

## Problem

While working on bulk import, I want to use the `control-plane-url` flag
for a different request.
Currently, the local compute hook is used whenever no control plane is
specified in the config.
My test requires local compute notifications and a configured
`control-plane-url` which isn't supported.

## Summary of changes

Add a `use-local-compute-notifications` flag. When this is set, we use
the local flow regardless of other config values.
It's enabled by default in neon_local and disabled by default in all
other envs. I had to turn the flag off in tests
that wish to bypass the local flow, but that's expected.

---------

Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
This commit is contained in:
Vlad Lazar
2025-03-21 15:31:06 +00:00
committed by GitHub
parent 23ad228310
commit 9fc7c22cc9
12 changed files with 78 additions and 29 deletions

View File

@@ -184,6 +184,8 @@ pub struct NeonStorageControllerConf {
pub timelines_onto_safekeepers: bool,
pub use_https_safekeeper_api: bool,
pub use_local_compute_notifications: bool,
}
impl NeonStorageControllerConf {
@@ -213,6 +215,7 @@ impl Default for NeonStorageControllerConf {
use_https_pageserver_api: false,
timelines_onto_safekeepers: false,
use_https_safekeeper_api: false,
use_local_compute_notifications: true,
}
}
}

View File

@@ -555,6 +555,10 @@ impl StorageController {
args.push("--use-https-safekeeper-api".to_string());
}
if self.config.use_local_compute_notifications {
args.push("--use-local-compute-notifications".to_string());
}
if let Some(ssl_ca_file) = self.env.ssl_ca_cert_path() {
args.push(format!("--ssl-ca-file={}", ssl_ca_file.to_str().unwrap()));
}

View File

@@ -624,16 +624,19 @@ impl ComputeHook {
MaybeSendResult::Transmit((request, lock)) => (request, lock),
};
let compute_hook_url = if let Some(control_plane_url) = &self.config.control_plane_url {
Some(if control_plane_url.ends_with('/') {
format!("{control_plane_url}notify-attach")
let result = if !self.config.use_local_compute_notifications {
let compute_hook_url = if let Some(control_plane_url) = &self.config.control_plane_url {
Some(if control_plane_url.ends_with('/') {
format!("{control_plane_url}notify-attach")
} else {
format!("{control_plane_url}/notify-attach")
})
} else {
format!("{control_plane_url}/notify-attach")
})
} else {
self.config.compute_hook_url.clone()
};
let result = if let Some(notify_url) = &compute_hook_url {
self.config.compute_hook_url.clone()
};
// We validate this at startup
let notify_url = compute_hook_url.as_ref().unwrap();
self.do_notify(notify_url, &request, cancel).await
} else {
self.do_notify_local(&request).await.map_err(|e| {

View File

@@ -203,6 +203,11 @@ struct Cli {
/// Trusted root CA certificates to use in https APIs.
#[arg(long)]
ssl_ca_file: Option<PathBuf>,
/// Neon local specific flag. When set, ignore [`Cli::control_plane_url`] and deliver
/// the compute notification directly (instead of via control plane).
#[arg(long, default_value = "false")]
use_local_compute_notifications: bool,
}
enum StrictMode {
@@ -368,6 +373,9 @@ async fn async_main() -> anyhow::Result<()> {
"neither `--compute-hook-url` nor `--control-plane-url` are set: this is only permitted in `--dev` mode"
);
}
StrictMode::Strict if args.use_local_compute_notifications => {
anyhow::bail!("`--use-local-compute-notifications` is only permitted in `--dev` mode");
}
StrictMode::Strict => {
tracing::info!("Starting in strict mode: configuration is OK.")
}
@@ -427,6 +435,7 @@ async fn async_main() -> anyhow::Result<()> {
use_https_safekeeper_api: args.use_https_safekeeper_api,
ssl_ca_certs,
timelines_onto_safekeepers: args.timelines_onto_safekeepers,
use_local_compute_notifications: args.use_local_compute_notifications,
};
// Validate that we can connect to the database

View File

@@ -448,6 +448,8 @@ pub struct Config {
pub ssl_ca_certs: Vec<Certificate>,
pub timelines_onto_safekeepers: bool,
pub use_local_compute_notifications: bool,
}
impl From<DatabaseError> for ApiError {

View File

@@ -1169,6 +1169,12 @@ class NeonEnv:
if storage_controller_config is not None:
cfg["storage_controller"] = storage_controller_config
if config.test_may_use_compatibility_snapshot_binaries:
if "storage_controller" in cfg:
cfg["storage_controller"]["use_local_compute_notifications"] = False
else:
cfg["storage_controller"] = {"use_local_compute_notifications": False}
# Create config for pageserver
http_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
pg_auth_type = "NeonJWT" if config.auth_enabled else "Trust"

View File

@@ -82,6 +82,7 @@ def test_storage_controller_many_tenants(
# guard against regressions in restart time.
"max_offline": "30s",
"max_warming_up": "300s",
"use_local_compute_notifications": False,
}
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api

View File

@@ -5,11 +5,9 @@ import asyncio
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.remote_storage import RemoteStorageKind
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
def test_change_pageserver(neon_env_builder: NeonEnvBuilder, make_httpserver):
def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
"""
A relatively low level test of reconfiguring a compute's pageserver at runtime. Usually this
is all done via the storage controller, but this test will disable the storage controller's compute
@@ -23,19 +21,6 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder, make_httpserver):
)
env = neon_env_builder.init_start()
neon_env_builder.control_plane_hooks_api = (
f"http://{make_httpserver.host}:{make_httpserver.port}/"
)
def ignore_notify(request: Request):
# This test does direct updates to compute configuration: disable the storage controller's notification
log.info(f"Ignoring storage controller compute notification: {request.json}")
return Response(status=200)
make_httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(
ignore_notify
)
env.create_branch("test_change_pageserver")
endpoint = env.endpoints.create_start("test_change_pageserver")

View File

@@ -12,6 +12,7 @@ import fixtures.utils
import pytest
import toml
from fixtures.common_types import TenantId, TimelineId
from fixtures.compute_reconfigure import ComputeReconfigure
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
@@ -592,17 +593,22 @@ def test_historic_storage_formats(
@check_ondisk_data_compatibility_if_enabled
@pytest.mark.xdist_group("compatibility")
@pytest.mark.parametrize(**fixtures.utils.allpairs_versions())
@pytest.mark.parametrize(
**fixtures.utils.allpairs_versions(),
)
def test_versions_mismatch(
neon_env_builder: NeonEnvBuilder,
test_output_dir: Path,
pg_version: PgVersion,
compatibility_snapshot_dir,
compute_reconfigure_listener: ComputeReconfigure,
combination,
):
"""
Checks compatibility of different combinations of versions of the components
"""
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.from_repo_dir(
compatibility_snapshot_dir / "repo",

View File

@@ -91,6 +91,8 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, make_httpserver,
f"http://{make_httpserver.host}:{make_httpserver.port}/"
)
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
def ignore_notify(request: Request):
# This test does all its own compute configuration (by passing explicit pageserver ID to Workload functions),
# so we send controller notifications to /dev/null to prevent it fighting the test for control of the compute.

View File

@@ -808,6 +808,8 @@ def test_sharding_split_stripe_size(
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
env = neon_env_builder.init_start(
initial_tenant_shard_count=1, initial_tenant_shard_stripe_size=initial_stripe_size
)
@@ -1316,6 +1318,11 @@ def test_sharding_split_failures(
initial_shard_count = 2
split_shard_count = 4
neon_env_builder.storage_controller_config = {
# Route to `compute_reconfigure_listener` instead
"use_local_compute_notifications": False,
}
env = neon_env_builder.init_configs()
env.start()

View File

@@ -73,7 +73,9 @@ def get_node_shard_counts(env: NeonEnv, tenant_ids):
@pytest.mark.parametrize(**fixtures.utils.allpairs_versions())
def test_storage_controller_smoke(neon_env_builder: NeonEnvBuilder, combination):
def test_storage_controller_smoke(
neon_env_builder: NeonEnvBuilder, compute_reconfigure_listener: ComputeReconfigure, combination
):
"""
Test the basic lifecycle of a storage controller:
- Restarting
@@ -83,6 +85,7 @@ def test_storage_controller_smoke(neon_env_builder: NeonEnvBuilder, combination)
"""
neon_env_builder.num_pageservers = 3
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
env = neon_env_builder.init_configs()
# Start services by hand so that we can skip a pageserver (this will start + register later)
@@ -620,6 +623,8 @@ def test_storage_controller_compute_hook(
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
# Start running
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
@@ -738,6 +743,8 @@ def test_storage_controller_stuck_compute_hook(
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
# Start running
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
@@ -885,6 +892,8 @@ def test_storage_controller_compute_hook_retry(
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
# Start running
env = neon_env_builder.init_configs()
env.start()
@@ -1008,6 +1017,8 @@ def test_storage_controller_compute_hook_revert(
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
# Start running
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
tenant_id = env.initial_tenant
@@ -1398,6 +1409,11 @@ def test_storage_controller_tenant_deletion(
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
neon_env_builder.storage_controller_config = {
# Route to `compute_reconfigure_listener` instead
"use_local_compute_notifications": False,
}
env = neon_env_builder.init_configs()
env.start()
@@ -2176,7 +2192,12 @@ def test_tenant_import(neon_env_builder: NeonEnvBuilder, shard_count, remote_sto
@pytest.mark.parametrize(**fixtures.utils.allpairs_versions())
@pytest.mark.parametrize("num_azs", [1, 2])
def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder, num_azs: int, combination):
def test_graceful_cluster_restart(
neon_env_builder: NeonEnvBuilder,
num_azs: int,
compute_reconfigure_listener: ComputeReconfigure,
combination,
):
"""
Graceful reststart of storage controller clusters use the drain and
fill hooks in order to migrate attachments away from pageservers before
@@ -2188,6 +2209,7 @@ def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder, num_azs: int
"""
neon_env_builder.num_azs = num_azs
neon_env_builder.num_pageservers = 2
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
env = neon_env_builder.init_configs()
env.start()
@@ -2443,7 +2465,6 @@ def test_background_operation_cancellation(neon_env_builder: NeonEnvBuilder):
@pytest.mark.parametrize("while_offline", [True, False])
def test_storage_controller_node_deletion(
neon_env_builder: NeonEnvBuilder,
compute_reconfigure_listener: ComputeReconfigure,
while_offline: bool,
):
"""