Compare commits

..

3 Commits

Author SHA1 Message Date
Folke Behrens
a7946dffec neon-image: Add debugging tools to image 2025-03-24 19:52:27 +01:00
Anastasia Lubennikova
3e5884ff01 Revert "feat(compute_ctl): allow to change audit_log_level for existi… (#11343)
…ng (#11308)"

This reverts commit e5aef3747c.

The logic of this commit was incorrect:
enabling audit requires a restart of the compute,
because audit extensions use shared_preload_libraries.
So it cannot be done in the configuration phase,
require endpoint restart instead.
2025-03-21 18:09:34 +00:00
Vlad Lazar
9fc7c22cc9 storcon: add use_local_compute_notifications flag (#11333)
## Problem

While working on bulk import, I want to use the `control-plane-url` flag
for a different request.
Currently, the local compute hook is used whenever no control plane is
specified in the config.
My test requires local compute notifications and a configured
`control-plane-url` which isn't supported.

## Summary of changes

Add a `use-local-compute-notifications` flag. When this is set, we use
the local flow regardless of other config values.
It's enabled by default in neon_local and disabled by default in all
other envs. I had to turn the flag off in tests
that wish to bypass the local flow, but that's expected.

---------

Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
2025-03-21 15:31:06 +00:00
15 changed files with 86 additions and 138 deletions

View File

@@ -103,12 +103,17 @@ RUN set -e \
&& echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries \
&& apt update \
&& apt install -y \
bpftrace \
ca-certificates \
libreadline-dev \
libseccomp-dev \
ca-certificates \
# System postgres for use with client libraries (e.g. in storage controller)
postgresql-15 \
iproute2 \
lsof \
openssl \
# System postgres for use with client libraries (e.g. in storage controller)
postgresql-15 \
screen \
tcpdump \
&& rm -f /etc/apt/apt.conf.d/80-retries \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* \
&& useradd -d /data neon \

View File

@@ -1497,27 +1497,6 @@ impl ComputeNode {
Ok::<(), anyhow::Error>(())
}
/// Apply config operations that are not covered by `skip_pg_catalog_updates`
#[instrument(skip_all)]
pub fn apply_config_non_skippable(&self, compute_state: &ComputeState) -> Result<()> {
let conf = self.get_tokio_conn_conf(Some("compute_ctl:apply_config"));
let conf = Arc::new(conf);
let spec = Arc::new(
compute_state
.pspec
.as_ref()
.expect("spec must be set")
.spec
.clone(),
);
// Merge-apply spec & changes to PostgreSQL state.
self.apply_spec_sql_non_skippable(spec.clone(), conf.clone())?;
Ok::<(), anyhow::Error>(())
}
// Wrapped this around `pg_ctl reload`, but right now we don't use
// `pg_ctl` for start / stop.
#[instrument(skip_all)]
@@ -1640,24 +1619,8 @@ impl ComputeNode {
"updated postgresql.conf to set neon.disable_logical_replication_subscribers=false"
);
}
self.pg_reload_conf()?;
} else {
// We need to run some operations even if skip_pg_catalog_updates is set
let pgdata_path = Path::new(&self.params.pgdata);
// temporarily reset max_cluster_size in config
// to avoid the possibility of hitting the limit, while we are applying config:
// creating new extensions, roles, etc...
config::with_compute_ctl_tmp_override(pgdata_path, "neon.max_cluster_size=-1", || {
self.pg_reload_conf()?;
self.apply_config_non_skippable(compute_state)?;
Ok(())
})?;
self.pg_reload_conf()?;
}
self.post_apply_config()?;
Ok(())

View File

@@ -308,75 +308,6 @@ impl ComputeNode {
Ok(())
}
// Similar to apply_spec_sql, but for the simplified set of operations
// that we perform even when `pg_skip_catalog_updates` is set.
//
// Keep the list of operations as small as possible,
// as it will be run on every spec change and affect compute start time.
pub fn apply_spec_sql_non_skippable(
&self,
spec: Arc<ComputeSpec>,
conf: Arc<tokio_postgres::Config>,
) -> Result<()> {
info!("Applying non_skippable config",);
debug!("Config: {:?}", spec);
let rt = tokio::runtime::Handle::current();
rt.block_on(async {
let client = Self::get_maintenance_client(&conf).await?;
let spec = spec.clone();
let jwks_roles = Arc::new(
spec.as_ref()
.local_proxy_config
.iter()
.flat_map(|it| &it.jwks)
.flatten()
.flat_map(|setting| &setting.role_names)
.cloned()
.collect::<HashSet<_>>(),
);
// NOTE: Here we assume that operations below don't use ctx
// TODO: refactor apply_operations() to accept ctx as option.
let ctx = Arc::new(tokio::sync::RwLock::new(MutableApplyContext {
roles: HashMap::new(),
dbs: HashMap::new(),
}));
let mut phases = vec![];
match spec.audit_log_level {
ComputeAudit::Hipaa => {
phases.push(CreatePgauditExtension);
phases.push(CreatePgauditlogtofileExtension);
phases.push(DisablePostgresDBPgAudit);
}
ComputeAudit::Log => {
phases.push(CreatePgauditExtension);
phases.push(DisablePostgresDBPgAudit);
}
ComputeAudit::Disabled => {}
}
for phase in phases {
debug!("Applying phase {:?}", &phase);
apply_operations(
spec.clone(),
ctx.clone(),
jwks_roles.clone(),
phase,
|| async { Ok(&client) },
)
.await?;
}
Ok::<(), anyhow::Error>(())
})?;
Ok(())
}
/// Apply SQL migrations of the RunInEachDatabase phase.
///
/// May opt to not connect to databases that don't have any scheduled

View File

@@ -184,6 +184,8 @@ pub struct NeonStorageControllerConf {
pub timelines_onto_safekeepers: bool,
pub use_https_safekeeper_api: bool,
pub use_local_compute_notifications: bool,
}
impl NeonStorageControllerConf {
@@ -213,6 +215,7 @@ impl Default for NeonStorageControllerConf {
use_https_pageserver_api: false,
timelines_onto_safekeepers: false,
use_https_safekeeper_api: false,
use_local_compute_notifications: true,
}
}
}

View File

@@ -555,6 +555,10 @@ impl StorageController {
args.push("--use-https-safekeeper-api".to_string());
}
if self.config.use_local_compute_notifications {
args.push("--use-local-compute-notifications".to_string());
}
if let Some(ssl_ca_file) = self.env.ssl_ca_cert_path() {
args.push(format!("--ssl-ca-file={}", ssl_ca_file.to_str().unwrap()));
}

View File

@@ -624,16 +624,19 @@ impl ComputeHook {
MaybeSendResult::Transmit((request, lock)) => (request, lock),
};
let compute_hook_url = if let Some(control_plane_url) = &self.config.control_plane_url {
Some(if control_plane_url.ends_with('/') {
format!("{control_plane_url}notify-attach")
let result = if !self.config.use_local_compute_notifications {
let compute_hook_url = if let Some(control_plane_url) = &self.config.control_plane_url {
Some(if control_plane_url.ends_with('/') {
format!("{control_plane_url}notify-attach")
} else {
format!("{control_plane_url}/notify-attach")
})
} else {
format!("{control_plane_url}/notify-attach")
})
} else {
self.config.compute_hook_url.clone()
};
let result = if let Some(notify_url) = &compute_hook_url {
self.config.compute_hook_url.clone()
};
// We validate this at startup
let notify_url = compute_hook_url.as_ref().unwrap();
self.do_notify(notify_url, &request, cancel).await
} else {
self.do_notify_local(&request).await.map_err(|e| {

View File

@@ -203,6 +203,11 @@ struct Cli {
/// Trusted root CA certificates to use in https APIs.
#[arg(long)]
ssl_ca_file: Option<PathBuf>,
/// Neon local specific flag. When set, ignore [`Cli::control_plane_url`] and deliver
/// the compute notification directly (instead of via control plane).
#[arg(long, default_value = "false")]
use_local_compute_notifications: bool,
}
enum StrictMode {
@@ -368,6 +373,9 @@ async fn async_main() -> anyhow::Result<()> {
"neither `--compute-hook-url` nor `--control-plane-url` are set: this is only permitted in `--dev` mode"
);
}
StrictMode::Strict if args.use_local_compute_notifications => {
anyhow::bail!("`--use-local-compute-notifications` is only permitted in `--dev` mode");
}
StrictMode::Strict => {
tracing::info!("Starting in strict mode: configuration is OK.")
}
@@ -427,6 +435,7 @@ async fn async_main() -> anyhow::Result<()> {
use_https_safekeeper_api: args.use_https_safekeeper_api,
ssl_ca_certs,
timelines_onto_safekeepers: args.timelines_onto_safekeepers,
use_local_compute_notifications: args.use_local_compute_notifications,
};
// Validate that we can connect to the database

View File

@@ -448,6 +448,8 @@ pub struct Config {
pub ssl_ca_certs: Vec<Certificate>,
pub timelines_onto_safekeepers: bool,
pub use_local_compute_notifications: bool,
}
impl From<DatabaseError> for ApiError {

View File

@@ -1169,6 +1169,12 @@ class NeonEnv:
if storage_controller_config is not None:
cfg["storage_controller"] = storage_controller_config
if config.test_may_use_compatibility_snapshot_binaries:
if "storage_controller" in cfg:
cfg["storage_controller"]["use_local_compute_notifications"] = False
else:
cfg["storage_controller"] = {"use_local_compute_notifications": False}
# Create config for pageserver
http_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
pg_auth_type = "NeonJWT" if config.auth_enabled else "Trust"

View File

@@ -82,6 +82,7 @@ def test_storage_controller_many_tenants(
# guard against regressions in restart time.
"max_offline": "30s",
"max_warming_up": "300s",
"use_local_compute_notifications": False,
}
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api

View File

@@ -5,11 +5,9 @@ import asyncio
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.remote_storage import RemoteStorageKind
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
def test_change_pageserver(neon_env_builder: NeonEnvBuilder, make_httpserver):
def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
"""
A relatively low level test of reconfiguring a compute's pageserver at runtime. Usually this
is all done via the storage controller, but this test will disable the storage controller's compute
@@ -23,19 +21,6 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder, make_httpserver):
)
env = neon_env_builder.init_start()
neon_env_builder.control_plane_hooks_api = (
f"http://{make_httpserver.host}:{make_httpserver.port}/"
)
def ignore_notify(request: Request):
# This test does direct updates to compute configuration: disable the storage controller's notification
log.info(f"Ignoring storage controller compute notification: {request.json}")
return Response(status=200)
make_httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(
ignore_notify
)
env.create_branch("test_change_pageserver")
endpoint = env.endpoints.create_start("test_change_pageserver")

View File

@@ -12,6 +12,7 @@ import fixtures.utils
import pytest
import toml
from fixtures.common_types import TenantId, TimelineId
from fixtures.compute_reconfigure import ComputeReconfigure
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
@@ -592,17 +593,22 @@ def test_historic_storage_formats(
@check_ondisk_data_compatibility_if_enabled
@pytest.mark.xdist_group("compatibility")
@pytest.mark.parametrize(**fixtures.utils.allpairs_versions())
@pytest.mark.parametrize(
**fixtures.utils.allpairs_versions(),
)
def test_versions_mismatch(
neon_env_builder: NeonEnvBuilder,
test_output_dir: Path,
pg_version: PgVersion,
compatibility_snapshot_dir,
compute_reconfigure_listener: ComputeReconfigure,
combination,
):
"""
Checks compatibility of different combinations of versions of the components
"""
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.from_repo_dir(
compatibility_snapshot_dir / "repo",

View File

@@ -91,6 +91,8 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, make_httpserver,
f"http://{make_httpserver.host}:{make_httpserver.port}/"
)
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
def ignore_notify(request: Request):
# This test does all its own compute configuration (by passing explicit pageserver ID to Workload functions),
# so we send controller notifications to /dev/null to prevent it fighting the test for control of the compute.

View File

@@ -808,6 +808,8 @@ def test_sharding_split_stripe_size(
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
env = neon_env_builder.init_start(
initial_tenant_shard_count=1, initial_tenant_shard_stripe_size=initial_stripe_size
)
@@ -1316,6 +1318,11 @@ def test_sharding_split_failures(
initial_shard_count = 2
split_shard_count = 4
neon_env_builder.storage_controller_config = {
# Route to `compute_reconfigure_listener` instead
"use_local_compute_notifications": False,
}
env = neon_env_builder.init_configs()
env.start()

View File

@@ -73,7 +73,9 @@ def get_node_shard_counts(env: NeonEnv, tenant_ids):
@pytest.mark.parametrize(**fixtures.utils.allpairs_versions())
def test_storage_controller_smoke(neon_env_builder: NeonEnvBuilder, combination):
def test_storage_controller_smoke(
neon_env_builder: NeonEnvBuilder, compute_reconfigure_listener: ComputeReconfigure, combination
):
"""
Test the basic lifecycle of a storage controller:
- Restarting
@@ -83,6 +85,7 @@ def test_storage_controller_smoke(neon_env_builder: NeonEnvBuilder, combination)
"""
neon_env_builder.num_pageservers = 3
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
env = neon_env_builder.init_configs()
# Start services by hand so that we can skip a pageserver (this will start + register later)
@@ -620,6 +623,8 @@ def test_storage_controller_compute_hook(
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
# Start running
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
@@ -738,6 +743,8 @@ def test_storage_controller_stuck_compute_hook(
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
# Start running
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
@@ -885,6 +892,8 @@ def test_storage_controller_compute_hook_retry(
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
# Start running
env = neon_env_builder.init_configs()
env.start()
@@ -1008,6 +1017,8 @@ def test_storage_controller_compute_hook_revert(
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
# Start running
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
tenant_id = env.initial_tenant
@@ -1398,6 +1409,11 @@ def test_storage_controller_tenant_deletion(
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
neon_env_builder.storage_controller_config = {
# Route to `compute_reconfigure_listener` instead
"use_local_compute_notifications": False,
}
env = neon_env_builder.init_configs()
env.start()
@@ -2176,7 +2192,12 @@ def test_tenant_import(neon_env_builder: NeonEnvBuilder, shard_count, remote_sto
@pytest.mark.parametrize(**fixtures.utils.allpairs_versions())
@pytest.mark.parametrize("num_azs", [1, 2])
def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder, num_azs: int, combination):
def test_graceful_cluster_restart(
neon_env_builder: NeonEnvBuilder,
num_azs: int,
compute_reconfigure_listener: ComputeReconfigure,
combination,
):
"""
Graceful reststart of storage controller clusters use the drain and
fill hooks in order to migrate attachments away from pageservers before
@@ -2188,6 +2209,7 @@ def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder, num_azs: int
"""
neon_env_builder.num_azs = num_azs
neon_env_builder.num_pageservers = 2
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
env = neon_env_builder.init_configs()
env.start()
@@ -2443,7 +2465,6 @@ def test_background_operation_cancellation(neon_env_builder: NeonEnvBuilder):
@pytest.mark.parametrize("while_offline", [True, False])
def test_storage_controller_node_deletion(
neon_env_builder: NeonEnvBuilder,
compute_reconfigure_listener: ComputeReconfigure,
while_offline: bool,
):
"""