Compare commits

..

2 Commits

Author SHA1 Message Date
Anastasia Lubennikova
15b06a6a1e compute: add non-skippable apply config operations.
apply_config() step of compute start is controlled by skip_pg_catalog_updates flag,
this is a performance optimization to decrease compute startup time, but it introduces extra dependency on cplane.
Introduce small subset of operations that we run always, independent from this flag.
2025-03-21 15:27:35 +00:00
Anastasia Lubennikova
ff87b51300 Revert "feat(compute_ctl): allow to change audit_log_level for existing (#11308)"
This reverts commit e5aef3747c.

The logic of this commit was incorrect:
enabling audit requires a restart of the compute,
because audit extensions use shared_preload_libraries.
So it cabnnot be done in the configuration phase,
require endpoint restart instead.
2025-03-21 15:21:22 +00:00
15 changed files with 138 additions and 86 deletions

View File

@@ -103,17 +103,12 @@ RUN set -e \
&& echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries \
&& apt update \
&& apt install -y \
bpftrace \
ca-certificates \
libreadline-dev \
libseccomp-dev \
iproute2 \
lsof \
openssl \
# System postgres for use with client libraries (e.g. in storage controller)
ca-certificates \
# System postgres for use with client libraries (e.g. in storage controller)
postgresql-15 \
screen \
tcpdump \
openssl \
&& rm -f /etc/apt/apt.conf.d/80-retries \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* \
&& useradd -d /data neon \

View File

@@ -1497,6 +1497,27 @@ impl ComputeNode {
Ok::<(), anyhow::Error>(())
}
/// Apply config operations that are not covered by `skip_pg_catalog_updates`
#[instrument(skip_all)]
pub fn apply_config_non_skippable(&self, compute_state: &ComputeState) -> Result<()> {
let conf = self.get_tokio_conn_conf(Some("compute_ctl:apply_config"));
let conf = Arc::new(conf);
let spec = Arc::new(
compute_state
.pspec
.as_ref()
.expect("spec must be set")
.spec
.clone(),
);
// Merge-apply spec & changes to PostgreSQL state.
self.apply_spec_sql_non_skippable(spec.clone(), conf.clone())?;
Ok::<(), anyhow::Error>(())
}
// Wrapped this around `pg_ctl reload`, but right now we don't use
// `pg_ctl` for start / stop.
#[instrument(skip_all)]
@@ -1619,8 +1640,24 @@ impl ComputeNode {
"updated postgresql.conf to set neon.disable_logical_replication_subscribers=false"
);
}
self.pg_reload_conf()?;
} else {
// We need to run some operations even if skip_pg_catalog_updates is set
let pgdata_path = Path::new(&self.params.pgdata);
// temporarily reset max_cluster_size in config
// to avoid the possibility of hitting the limit, while we are applying config:
// creating new extensions, roles, etc...
config::with_compute_ctl_tmp_override(pgdata_path, "neon.max_cluster_size=-1", || {
self.pg_reload_conf()?;
self.apply_config_non_skippable(compute_state)?;
Ok(())
})?;
self.pg_reload_conf()?;
}
self.post_apply_config()?;
Ok(())

View File

@@ -308,6 +308,75 @@ impl ComputeNode {
Ok(())
}
// Similar to apply_spec_sql, but for the simplified set of operations
// that we perform even when `pg_skip_catalog_updates` is set.
//
// Keep the list of operations as small as possible,
// as it will be run on every spec change and affect compute start time.
pub fn apply_spec_sql_non_skippable(
&self,
spec: Arc<ComputeSpec>,
conf: Arc<tokio_postgres::Config>,
) -> Result<()> {
info!("Applying non_skippable config",);
debug!("Config: {:?}", spec);
let rt = tokio::runtime::Handle::current();
rt.block_on(async {
let client = Self::get_maintenance_client(&conf).await?;
let spec = spec.clone();
let jwks_roles = Arc::new(
spec.as_ref()
.local_proxy_config
.iter()
.flat_map(|it| &it.jwks)
.flatten()
.flat_map(|setting| &setting.role_names)
.cloned()
.collect::<HashSet<_>>(),
);
// NOTE: Here we assume that operations below don't use ctx
// TODO: refactor apply_operations() to accept ctx as option.
let ctx = Arc::new(tokio::sync::RwLock::new(MutableApplyContext {
roles: HashMap::new(),
dbs: HashMap::new(),
}));
let mut phases = vec![];
match spec.audit_log_level {
ComputeAudit::Hipaa => {
phases.push(CreatePgauditExtension);
phases.push(CreatePgauditlogtofileExtension);
phases.push(DisablePostgresDBPgAudit);
}
ComputeAudit::Log => {
phases.push(CreatePgauditExtension);
phases.push(DisablePostgresDBPgAudit);
}
ComputeAudit::Disabled => {}
}
for phase in phases {
debug!("Applying phase {:?}", &phase);
apply_operations(
spec.clone(),
ctx.clone(),
jwks_roles.clone(),
phase,
|| async { Ok(&client) },
)
.await?;
}
Ok::<(), anyhow::Error>(())
})?;
Ok(())
}
/// Apply SQL migrations of the RunInEachDatabase phase.
///
/// May opt to not connect to databases that don't have any scheduled

View File

@@ -184,8 +184,6 @@ pub struct NeonStorageControllerConf {
pub timelines_onto_safekeepers: bool,
pub use_https_safekeeper_api: bool,
pub use_local_compute_notifications: bool,
}
impl NeonStorageControllerConf {
@@ -215,7 +213,6 @@ impl Default for NeonStorageControllerConf {
use_https_pageserver_api: false,
timelines_onto_safekeepers: false,
use_https_safekeeper_api: false,
use_local_compute_notifications: true,
}
}
}

View File

@@ -555,10 +555,6 @@ impl StorageController {
args.push("--use-https-safekeeper-api".to_string());
}
if self.config.use_local_compute_notifications {
args.push("--use-local-compute-notifications".to_string());
}
if let Some(ssl_ca_file) = self.env.ssl_ca_cert_path() {
args.push(format!("--ssl-ca-file={}", ssl_ca_file.to_str().unwrap()));
}

View File

@@ -624,19 +624,16 @@ impl ComputeHook {
MaybeSendResult::Transmit((request, lock)) => (request, lock),
};
let result = if !self.config.use_local_compute_notifications {
let compute_hook_url = if let Some(control_plane_url) = &self.config.control_plane_url {
Some(if control_plane_url.ends_with('/') {
format!("{control_plane_url}notify-attach")
} else {
format!("{control_plane_url}/notify-attach")
})
let compute_hook_url = if let Some(control_plane_url) = &self.config.control_plane_url {
Some(if control_plane_url.ends_with('/') {
format!("{control_plane_url}notify-attach")
} else {
self.config.compute_hook_url.clone()
};
// We validate this at startup
let notify_url = compute_hook_url.as_ref().unwrap();
format!("{control_plane_url}/notify-attach")
})
} else {
self.config.compute_hook_url.clone()
};
let result = if let Some(notify_url) = &compute_hook_url {
self.do_notify(notify_url, &request, cancel).await
} else {
self.do_notify_local(&request).await.map_err(|e| {

View File

@@ -203,11 +203,6 @@ struct Cli {
/// Trusted root CA certificates to use in https APIs.
#[arg(long)]
ssl_ca_file: Option<PathBuf>,
/// Neon local specific flag. When set, ignore [`Cli::control_plane_url`] and deliver
/// the compute notification directly (instead of via control plane).
#[arg(long, default_value = "false")]
use_local_compute_notifications: bool,
}
enum StrictMode {
@@ -373,9 +368,6 @@ async fn async_main() -> anyhow::Result<()> {
"neither `--compute-hook-url` nor `--control-plane-url` are set: this is only permitted in `--dev` mode"
);
}
StrictMode::Strict if args.use_local_compute_notifications => {
anyhow::bail!("`--use-local-compute-notifications` is only permitted in `--dev` mode");
}
StrictMode::Strict => {
tracing::info!("Starting in strict mode: configuration is OK.")
}
@@ -435,7 +427,6 @@ async fn async_main() -> anyhow::Result<()> {
use_https_safekeeper_api: args.use_https_safekeeper_api,
ssl_ca_certs,
timelines_onto_safekeepers: args.timelines_onto_safekeepers,
use_local_compute_notifications: args.use_local_compute_notifications,
};
// Validate that we can connect to the database

View File

@@ -448,8 +448,6 @@ pub struct Config {
pub ssl_ca_certs: Vec<Certificate>,
pub timelines_onto_safekeepers: bool,
pub use_local_compute_notifications: bool,
}
impl From<DatabaseError> for ApiError {

View File

@@ -1169,12 +1169,6 @@ class NeonEnv:
if storage_controller_config is not None:
cfg["storage_controller"] = storage_controller_config
if config.test_may_use_compatibility_snapshot_binaries:
if "storage_controller" in cfg:
cfg["storage_controller"]["use_local_compute_notifications"] = False
else:
cfg["storage_controller"] = {"use_local_compute_notifications": False}
# Create config for pageserver
http_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
pg_auth_type = "NeonJWT" if config.auth_enabled else "Trust"

View File

@@ -82,7 +82,6 @@ def test_storage_controller_many_tenants(
# guard against regressions in restart time.
"max_offline": "30s",
"max_warming_up": "300s",
"use_local_compute_notifications": False,
}
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api

View File

@@ -5,9 +5,11 @@ import asyncio
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.remote_storage import RemoteStorageKind
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
def test_change_pageserver(neon_env_builder: NeonEnvBuilder, make_httpserver):
"""
A relatively low level test of reconfiguring a compute's pageserver at runtime. Usually this
is all done via the storage controller, but this test will disable the storage controller's compute
@@ -21,6 +23,19 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
)
env = neon_env_builder.init_start()
neon_env_builder.control_plane_hooks_api = (
f"http://{make_httpserver.host}:{make_httpserver.port}/"
)
def ignore_notify(request: Request):
# This test does direct updates to compute configuration: disable the storage controller's notification
log.info(f"Ignoring storage controller compute notification: {request.json}")
return Response(status=200)
make_httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(
ignore_notify
)
env.create_branch("test_change_pageserver")
endpoint = env.endpoints.create_start("test_change_pageserver")

View File

@@ -12,7 +12,6 @@ import fixtures.utils
import pytest
import toml
from fixtures.common_types import TenantId, TimelineId
from fixtures.compute_reconfigure import ComputeReconfigure
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
@@ -593,22 +592,17 @@ def test_historic_storage_formats(
@check_ondisk_data_compatibility_if_enabled
@pytest.mark.xdist_group("compatibility")
@pytest.mark.parametrize(
**fixtures.utils.allpairs_versions(),
)
@pytest.mark.parametrize(**fixtures.utils.allpairs_versions())
def test_versions_mismatch(
neon_env_builder: NeonEnvBuilder,
test_output_dir: Path,
pg_version: PgVersion,
compatibility_snapshot_dir,
compute_reconfigure_listener: ComputeReconfigure,
combination,
):
"""
Checks compatibility of different combinations of versions of the components
"""
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.from_repo_dir(
compatibility_snapshot_dir / "repo",

View File

@@ -91,8 +91,6 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, make_httpserver,
f"http://{make_httpserver.host}:{make_httpserver.port}/"
)
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
def ignore_notify(request: Request):
# This test does all its own compute configuration (by passing explicit pageserver ID to Workload functions),
# so we send controller notifications to /dev/null to prevent it fighting the test for control of the compute.

View File

@@ -808,8 +808,6 @@ def test_sharding_split_stripe_size(
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
env = neon_env_builder.init_start(
initial_tenant_shard_count=1, initial_tenant_shard_stripe_size=initial_stripe_size
)
@@ -1318,11 +1316,6 @@ def test_sharding_split_failures(
initial_shard_count = 2
split_shard_count = 4
neon_env_builder.storage_controller_config = {
# Route to `compute_reconfigure_listener` instead
"use_local_compute_notifications": False,
}
env = neon_env_builder.init_configs()
env.start()

View File

@@ -73,9 +73,7 @@ def get_node_shard_counts(env: NeonEnv, tenant_ids):
@pytest.mark.parametrize(**fixtures.utils.allpairs_versions())
def test_storage_controller_smoke(
neon_env_builder: NeonEnvBuilder, compute_reconfigure_listener: ComputeReconfigure, combination
):
def test_storage_controller_smoke(neon_env_builder: NeonEnvBuilder, combination):
"""
Test the basic lifecycle of a storage controller:
- Restarting
@@ -85,7 +83,6 @@ def test_storage_controller_smoke(
"""
neon_env_builder.num_pageservers = 3
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
env = neon_env_builder.init_configs()
# Start services by hand so that we can skip a pageserver (this will start + register later)
@@ -623,8 +620,6 @@ def test_storage_controller_compute_hook(
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
# Start running
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
@@ -743,8 +738,6 @@ def test_storage_controller_stuck_compute_hook(
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
# Start running
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
@@ -892,8 +885,6 @@ def test_storage_controller_compute_hook_retry(
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
# Start running
env = neon_env_builder.init_configs()
env.start()
@@ -1017,8 +1008,6 @@ def test_storage_controller_compute_hook_revert(
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
# Start running
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
tenant_id = env.initial_tenant
@@ -1409,11 +1398,6 @@ def test_storage_controller_tenant_deletion(
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
neon_env_builder.storage_controller_config = {
# Route to `compute_reconfigure_listener` instead
"use_local_compute_notifications": False,
}
env = neon_env_builder.init_configs()
env.start()
@@ -2192,12 +2176,7 @@ def test_tenant_import(neon_env_builder: NeonEnvBuilder, shard_count, remote_sto
@pytest.mark.parametrize(**fixtures.utils.allpairs_versions())
@pytest.mark.parametrize("num_azs", [1, 2])
def test_graceful_cluster_restart(
neon_env_builder: NeonEnvBuilder,
num_azs: int,
compute_reconfigure_listener: ComputeReconfigure,
combination,
):
def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder, num_azs: int, combination):
"""
Graceful reststart of storage controller clusters use the drain and
fill hooks in order to migrate attachments away from pageservers before
@@ -2209,7 +2188,6 @@ def test_graceful_cluster_restart(
"""
neon_env_builder.num_azs = num_azs
neon_env_builder.num_pageservers = 2
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
env = neon_env_builder.init_configs()
env.start()
@@ -2465,6 +2443,7 @@ def test_background_operation_cancellation(neon_env_builder: NeonEnvBuilder):
@pytest.mark.parametrize("while_offline", [True, False])
def test_storage_controller_node_deletion(
neon_env_builder: NeonEnvBuilder,
compute_reconfigure_listener: ComputeReconfigure,
while_offline: bool,
):
"""