diff --git a/Cargo.lock b/Cargo.lock index 8438dad41b..b0c7aec6ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1348,6 +1348,7 @@ dependencies = [ "tokio-postgres", "tokio-util", "toml", + "toml_edit", "tracing", "url", "utils", diff --git a/control_plane/Cargo.toml b/control_plane/Cargo.toml index 2ce041068e..e62f3b8a47 100644 --- a/control_plane/Cargo.toml +++ b/control_plane/Cargo.toml @@ -28,6 +28,7 @@ serde_with.workspace = true tar.workspace = true thiserror.workspace = true toml.workspace = true +toml_edit.workspace = true tokio.workspace = true tokio-postgres.workspace = true tokio-util.workspace = true diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index e01d5c9799..3f09042d9d 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -133,7 +133,7 @@ fn main() -> Result<()> { let subcommand_result = match sub_name { "tenant" => rt.block_on(handle_tenant(sub_args, &mut env)), "timeline" => rt.block_on(handle_timeline(sub_args, &mut env)), - "start" => rt.block_on(handle_start_all(sub_args, &env)), + "start" => rt.block_on(handle_start_all(&env)), "stop" => rt.block_on(handle_stop_all(sub_args, &env)), "pageserver" => rt.block_on(handle_pageserver(sub_args, &env)), "storage_controller" => rt.block_on(handle_storage_controller(sub_args, &env)), @@ -358,6 +358,13 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result { default_conf(*num_pageservers) }; + let pageserver_config: toml_edit::Document = + if let Some(path) = init_match.get_one::("pageserver-config") { + std::fs::read_to_string(path)?.parse()? + } else { + toml_edit::Document::new() + }; + let pg_version = init_match .get_one::("pg-version") .copied() @@ -375,7 +382,7 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result { // Initialize pageserver, create initial tenant and timeline. for ps_conf in &env.pageservers { PageServerNode::from_env(&env, ps_conf) - .initialize(&pageserver_config_overrides(init_match)) + .initialize(&pageserver_config) .unwrap_or_else(|e| { eprintln!("pageserver init failed: {e:?}"); exit(1); @@ -397,15 +404,6 @@ fn get_default_pageserver(env: &local_env::LocalEnv) -> PageServerNode { PageServerNode::from_env(env, ps_conf) } -fn pageserver_config_overrides(init_match: &ArgMatches) -> Vec<&str> { - init_match - .get_many::("pageserver-config-override") - .into_iter() - .flatten() - .map(String::as_str) - .collect() -} - async fn handle_tenant( tenant_match: &ArgMatches, env: &mut local_env::LocalEnv, @@ -1076,10 +1074,7 @@ fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result Result<()> { match sub_match.subcommand() { Some(("start", subcommand_args)) => { - if let Err(e) = get_pageserver(env, subcommand_args)? - .start(&pageserver_config_overrides(subcommand_args)) - .await - { + if let Err(e) = get_pageserver(env, subcommand_args)?.start().await { eprintln!("pageserver start failed: {e}"); exit(1); } @@ -1105,10 +1100,7 @@ async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> exit(1); } - if let Err(e) = pageserver - .start(&pageserver_config_overrides(subcommand_args)) - .await - { + if let Err(e) = pageserver.start().await { eprintln!("pageserver start failed: {e}"); exit(1); } @@ -1235,7 +1227,7 @@ async fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Ok(()) } -async fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow::Result<()> { +async fn handle_start_all(env: &local_env::LocalEnv) -> anyhow::Result<()> { // Endpoints are not started automatically broker::start_broker_process(env).await?; @@ -1252,10 +1244,7 @@ async fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> for ps_conf in &env.pageservers { let pageserver = PageServerNode::from_env(env, ps_conf); - if let Err(e) = pageserver - .start(&pageserver_config_overrides(sub_match)) - .await - { + if let Err(e) = pageserver.start().await { eprintln!("pageserver {} start failed: {:#}", ps_conf.id, e); try_stop_all(env, true).await; exit(1); @@ -1396,13 +1385,6 @@ fn cli() -> Command { .required(false) .value_name("stop-mode"); - let pageserver_config_args = Arg::new("pageserver-config-override") - .long("pageserver-config-override") - .num_args(1) - .action(ArgAction::Append) - .help("Additional pageserver's configuration options or overrides, refer to pageserver's 'config-override' CLI parameter docs for more") - .required(false); - let remote_ext_config_args = Arg::new("remote-ext-config") .long("remote-ext-config") .num_args(1) @@ -1464,14 +1446,21 @@ fn cli() -> Command { .subcommand( Command::new("init") .about("Initialize a new Neon repository, preparing configs for services to start with") - .arg(pageserver_config_args.clone()) .arg(num_pageservers_arg.clone()) .arg( Arg::new("config") .long("config") .required(false) .value_parser(value_parser!(PathBuf)) - .value_name("config"), + .value_name("config") + ) + .arg( + Arg::new("pageserver-config") + .long("pageserver-config") + .required(false) + .value_parser(value_parser!(PathBuf)) + .value_name("pageserver-config") + .help("Merge the provided pageserver config into the one generated by neon_local."), ) .arg(pg_version_arg.clone()) .arg(force_arg) @@ -1553,7 +1542,6 @@ fn cli() -> Command { .subcommand(Command::new("status")) .subcommand(Command::new("start") .about("Start local pageserver") - .arg(pageserver_config_args.clone()) ) .subcommand(Command::new("stop") .about("Stop local pageserver") @@ -1561,7 +1549,6 @@ fn cli() -> Command { ) .subcommand(Command::new("restart") .about("Restart local pageserver") - .arg(pageserver_config_args.clone()) ) ) .subcommand( @@ -1676,7 +1663,6 @@ fn cli() -> Command { .subcommand( Command::new("start") .about("Start page server and safekeepers") - .arg(pageserver_config_args) ) .subcommand( Command::new("stop") diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 45be14ef95..fbe0d419ae 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -76,7 +76,7 @@ impl PageServerNode { /// Merge overrides provided by the user on the command line with our default overides derived from neon_local configuration. /// /// These all end up on the command line of the `pageserver` binary. - fn neon_local_overrides(&self, cli_overrides: &[&str]) -> Vec { + fn neon_local_overrides(&self, cli_overrides: &toml_edit::Document) -> Vec { // FIXME: the paths should be shell-escaped to handle paths with spaces, quotas etc. let pg_distrib_dir_param = format!( "pg_distrib_dir='{}'", @@ -156,10 +156,7 @@ impl PageServerNode { } } - if !cli_overrides - .iter() - .any(|c| c.starts_with("remote_storage")) - { + if !cli_overrides.contains_key("remote_storage") { overrides.push(format!( "remote_storage={{local_path='../{PAGESERVER_REMOTE_STORAGE_DIR}'}}" )); @@ -172,13 +169,13 @@ impl PageServerNode { } // Apply the user-provided overrides - overrides.extend(cli_overrides.iter().map(|&c| c.to_owned())); + overrides.push(cli_overrides.to_string()); overrides } /// Initializes a pageserver node by creating its config with the overrides provided. - pub fn initialize(&self, config_overrides: &[&str]) -> anyhow::Result<()> { + pub fn initialize(&self, config_overrides: &toml_edit::Document) -> anyhow::Result<()> { // First, run `pageserver --init` and wait for it to write a config into FS and exit. self.pageserver_init(config_overrides) .with_context(|| format!("Failed to run init for pageserver node {}", self.conf.id)) @@ -196,11 +193,11 @@ impl PageServerNode { .expect("non-Unicode path") } - pub async fn start(&self, config_overrides: &[&str]) -> anyhow::Result<()> { - self.start_node(config_overrides).await + pub async fn start(&self) -> anyhow::Result<()> { + self.start_node().await } - fn pageserver_init(&self, config_overrides: &[&str]) -> anyhow::Result<()> { + fn pageserver_init(&self, config_overrides: &toml_edit::Document) -> anyhow::Result<()> { let datadir = self.repo_path(); let node_id = self.conf.id; println!( @@ -268,7 +265,7 @@ impl PageServerNode { Ok(()) } - async fn start_node(&self, config_overrides: &[&str]) -> anyhow::Result<()> { + async fn start_node(&self) -> anyhow::Result<()> { // TODO: using a thread here because start_process() is not async but we need to call check_status() let datadir = self.repo_path(); print!( @@ -285,11 +282,7 @@ impl PageServerNode { self.conf.id, datadir, ) })?; - let mut args = vec!["-D", datadir_path_str]; - for config_override in config_overrides { - args.push("--config-override"); - args.push(*config_override); - } + let args = vec!["-D", datadir_path_str]; background_process::start_process( "pageserver", &datadir, diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 1e4de9a888..82f17fe20d 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -14,7 +14,7 @@ import textwrap import threading import time import uuid -from contextlib import closing, contextmanager +from contextlib import ExitStack, closing, contextmanager from dataclasses import dataclass from datetime import datetime from enum import Enum @@ -68,7 +68,7 @@ from fixtures.remote_storage import ( RemoteStorageUser, S3Storage, default_remote_storage, - remote_storage_to_toml_inline_table, + remote_storage_to_toml_dict, ) from fixtures.safekeeper.http import SafekeeperHttpClient from fixtures.safekeeper.utils import are_walreceivers_absent @@ -1705,36 +1705,44 @@ class NeonCli(AbstractNeonCli): force: Optional[str] = None, pageserver_config_override: Optional[str] = None, ) -> "subprocess.CompletedProcess[str]": - with tempfile.NamedTemporaryFile(mode="w+") as tmp: - tmp.write(toml.dumps(config)) - tmp.flush() + remote_storage = self.env.pageserver_remote_storage - cmd = ["init", f"--config={tmp.name}", "--pg-version", self.env.pg_version] + ps_config = {} + if remote_storage is not None: + ps_config["remote_storage"] = remote_storage_to_toml_dict(remote_storage) + + if pageserver_config_override is not None: + for o in pageserver_config_override.split(";"): + override = toml.loads(o) + for key, value in override.items(): + ps_config[key] = value + + with ExitStack() as stack: + ps_config_file = stack.enter_context(tempfile.NamedTemporaryFile(mode="w+")) + ps_config_file.write(toml.dumps(ps_config)) + ps_config_file.flush() + + neon_local_config = stack.enter_context(tempfile.NamedTemporaryFile(mode="w+")) + neon_local_config.write(toml.dumps(config)) + neon_local_config.flush() + + cmd = [ + "init", + f"--config={neon_local_config.name}", + "--pg-version", + self.env.pg_version, + f"--pageserver-config={ps_config_file.name}", + ] if force is not None: cmd.extend(["--force", force]) - remote_storage = self.env.pageserver_remote_storage - - if remote_storage is not None: - remote_storage_toml_table = remote_storage_to_toml_inline_table(remote_storage) - - cmd.append( - f"--pageserver-config-override=remote_storage={remote_storage_toml_table}" - ) - - if pageserver_config_override is not None: - cmd += [ - f"--pageserver-config-override={o.strip()}" - for o in pageserver_config_override.split(";") - ] - s3_env_vars = None if isinstance(remote_storage, S3Storage): s3_env_vars = remote_storage.access_env_vars() res = self.raw_cli(cmd, extra_env_vars=s3_env_vars) res.check_returncode() - return res + return res def storage_controller_start(self): cmd = ["storage_controller", "start"] @@ -1749,10 +1757,9 @@ class NeonCli(AbstractNeonCli): def pageserver_start( self, id: int, - overrides: Tuple[str, ...] = (), extra_env_vars: Optional[Dict[str, str]] = None, ) -> "subprocess.CompletedProcess[str]": - start_args = ["pageserver", "start", f"--id={id}", *overrides] + start_args = ["pageserver", "start", f"--id={id}"] storage = self.env.pageserver_remote_storage if isinstance(storage, S3Storage): @@ -2417,9 +2424,42 @@ class NeonPageserver(PgProtocol, LogUtils): return self.workdir / "tenants" return self.workdir / "tenants" / str(tenant_shard_id) + @property + def config_toml_path(self) -> Path: + return self.workdir / "pageserver.toml" + + def edit_config_toml(self, edit_fn: Callable[[Dict[str, Any]], None]): + """ + Edit the pageserver's config toml file in place. + """ + path = self.config_toml_path + with open(path, "r") as f: + config = toml.load(f) + edit_fn(config) + with open(path, "w") as f: + toml.dump(config, f) + + def patch_config_toml_nonrecursive(self, patch: Dict[str, Any]) -> Dict[str, Any]: + """ + Non-recursively merge the given `patch` dict into the existing config toml, using `dict.update()`. + Returns the replaced values. + If there was no previous value, the key is mapped to None. + This allows to restore the original value by calling this method with the returned dict. + """ + replacements = {} + + def doit(config: Dict[str, Any]): + while len(patch) > 0: + key, new = patch.popitem() + old = config.get(key, None) + config[key] = new + replacements[key] = old + + self.edit_config_toml(doit) + return replacements + def start( self, - overrides: Tuple[str, ...] = (), extra_env_vars: Optional[Dict[str, str]] = None, ) -> "NeonPageserver": """ @@ -2429,9 +2469,7 @@ class NeonPageserver(PgProtocol, LogUtils): """ assert self.running is False - self.env.neon_cli.pageserver_start( - self.id, overrides=overrides, extra_env_vars=extra_env_vars - ) + self.env.neon_cli.pageserver_start(self.id, extra_env_vars=extra_env_vars) self.running = True return self diff --git a/test_runner/fixtures/remote_storage.py b/test_runner/fixtures/remote_storage.py index 83f9f26837..925e1b450f 100644 --- a/test_runner/fixtures/remote_storage.py +++ b/test_runner/fixtures/remote_storage.py @@ -141,11 +141,13 @@ class LocalFsStorage: with self.heatmap_path(tenant_id).open("r") as f: return json.load(f) - def to_toml_inline_table(self) -> str: - rv = { + def to_toml_dict(self) -> Dict[str, Any]: + return { "local_path": str(self.root), } - return toml.TomlEncoder().dump_inline_table(rv) + + def to_toml_inline_table(self) -> str: + return toml.TomlEncoder().dump_inline_table(self.to_toml_dict()) def cleanup(self): # no cleanup is done here, because there's NeonEnvBuilder.cleanup_local_storage which will remove everything, including localfs files @@ -194,7 +196,7 @@ class S3Storage: } ) - def to_toml_inline_table(self) -> str: + def to_toml_dict(self) -> Dict[str, Any]: rv = { "bucket_name": self.bucket_name, "bucket_region": self.bucket_region, @@ -206,7 +208,10 @@ class S3Storage: if self.endpoint is not None: rv["endpoint"] = self.endpoint - return toml.TomlEncoder().dump_inline_table(rv) + return rv + + def to_toml_inline_table(self) -> str: + return toml.TomlEncoder().dump_inline_table(self.to_toml_dict()) def do_cleanup(self): if not self.cleanup: @@ -414,6 +419,13 @@ def default_remote_storage() -> RemoteStorageKind: return RemoteStorageKind.LOCAL_FS +def remote_storage_to_toml_dict(remote_storage: RemoteStorage) -> Dict[str, Any]: + if not isinstance(remote_storage, (LocalFsStorage, S3Storage)): + raise Exception("invalid remote storage type") + + return remote_storage.to_toml_dict() + + # serialize as toml inline table def remote_storage_to_toml_inline_table(remote_storage: RemoteStorage) -> str: if not isinstance(remote_storage, (LocalFsStorage, S3Storage)): diff --git a/test_runner/performance/test_branch_creation.py b/test_runner/performance/test_branch_creation.py index 54905759bd..7687b8417f 100644 --- a/test_runner/performance/test_branch_creation.py +++ b/test_runner/performance/test_branch_creation.py @@ -140,10 +140,14 @@ def test_branch_creation_many(neon_compare: NeonCompare, n_branches: int, shape: # start without gc so we can time compaction with less noise; use shorter # period for compaction so it starts earlier + def patch_default_tenant_config(config): + tenant_config = config.get("tenant_config", {}) + tenant_config["compaction_period"] = "3s" + tenant_config["gc_period"] = "0s" + config["tenant_config"] = tenant_config + + env.pageserver.edit_config_toml(patch_default_tenant_config) env.pageserver.start( - overrides=( - "--pageserver-config-override=tenant_config={ compaction_period = '3s', gc_period = '0s' }", - ), # this does print more than we want, but the number should be comparable between runs extra_env_vars={ "RUST_LOG": f"[compaction_loop{{tenant_id={env.initial_tenant}}}]=debug,info" diff --git a/test_runner/regress/test_disk_usage_eviction.py b/test_runner/regress/test_disk_usage_eviction.py index b83545216d..5e9efa7cce 100644 --- a/test_runner/regress/test_disk_usage_eviction.py +++ b/test_runner/regress/test_disk_usage_eviction.py @@ -5,7 +5,6 @@ from dataclasses import dataclass from typing import Any, Dict, Iterable, Tuple import pytest -import toml from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnv, @@ -45,17 +44,16 @@ def test_min_resident_size_override_handling( ps_http.set_tenant_config(tenant_id, {}) assert_config(tenant_id, None, default_tenant_conf_value) - env.pageserver.stop() if config_level_override is not None: - env.pageserver.start( - overrides=( - "--pageserver-config-override=tenant_config={ min_resident_size_override = " - + str(config_level_override) - + " }", - ) - ) - else: - env.pageserver.start() + + def set_min_resident_size(config): + tenant_config = config.get("tenant_config", {}) + tenant_config["min_resident_size_override"] = config_level_override + config["tenant_config"] = tenant_config + + env.pageserver.edit_config_toml(set_min_resident_size) + env.pageserver.stop() + env.pageserver.start() tenant_id, _ = env.neon_cli.create_tenant() assert_overrides(tenant_id, config_level_override) @@ -164,34 +162,32 @@ class EvictionEnv: usage eviction task is unknown; it might need to run one more iteration before assertions can be made. """ - disk_usage_config = { - "period": period, - "max_usage_pct": max_usage_pct, - "min_avail_bytes": min_avail_bytes, - "mock_statvfs": mock_behavior, - "eviction_order": eviction_order.config(), - } - - enc = toml.TomlEncoder() # these can sometimes happen during startup before any tenants have been # loaded, so nothing can be evicted, we just wait for next iteration which # is able to evict. pageserver.allowed_errors.append(".*WARN.* disk usage still high.*") - pageserver.start( - overrides=( - "--pageserver-config-override=disk_usage_based_eviction=" - + enc.dump_inline_table(disk_usage_config).replace("\n", " "), + pageserver.patch_config_toml_nonrecursive( + { + "disk_usage_based_eviction": { + "period": period, + "max_usage_pct": max_usage_pct, + "min_avail_bytes": min_avail_bytes, + "mock_statvfs": mock_behavior, + "eviction_order": eviction_order.config(), + }, # Disk usage based eviction runs as a background task. # But pageserver startup delays launch of background tasks for some time, to prioritize initial logical size calculations during startup. # But, initial logical size calculation may not be triggered if safekeepers don't publish new broker messages. # But, we only have a 10-second-timeout in this test. # So, disable the delay for this test. - "--pageserver-config-override=background_task_maximum_delay='0s'", - ), + "background_task_maximum_delay": "0s", + } ) + pageserver.start() + # we now do initial logical size calculation on startup, which on debug builds can fight with disk usage based eviction for tenant_id, timeline_id in self.timelines: tenant_ps = self.neon_env.get_tenant_pageserver(tenant_id) diff --git a/test_runner/regress/test_pageserver_generations.py b/test_runner/regress/test_pageserver_generations.py index 67f68a62af..f957bea156 100644 --- a/test_runner/regress/test_pageserver_generations.py +++ b/test_runner/regress/test_pageserver_generations.py @@ -220,7 +220,12 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder): # We will start a pageserver with no control_plane_api set, so it won't be able to self-register env.storage_controller.node_register(env.pageserver) - env.pageserver.start(overrides=('--pageserver-config-override=control_plane_api=""',)) + replaced_config = env.pageserver.patch_config_toml_nonrecursive( + { + "control_plane_api": "", + } + ) + env.pageserver.start() env.storage_controller.node_configure(env.pageserver.id, {"availability": "Active"}) env.neon_cli.create_tenant( @@ -251,8 +256,8 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder): assert parse_generation_suffix(key) is None env.pageserver.stop() - # Starting without the override that disabled control_plane_api + env.pageserver.patch_config_toml_nonrecursive(replaced_config) env.pageserver.start() generate_uploads_and_deletions(env, pageserver=env.pageserver, init=False) @@ -525,9 +530,12 @@ def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): # incident, but it might be unavoidable: if so, we want to be able to start up # and serve clients. env.pageserver.stop() # Non-immediate: implicitly checking that shutdown doesn't hang waiting for CP - env.pageserver.start( - overrides=("--pageserver-config-override=control_plane_emergency_mode=true",), + replaced = env.pageserver.patch_config_toml_nonrecursive( + { + "control_plane_emergency_mode": True, + } ) + env.pageserver.start() # The pageserver should provide service to clients generate_uploads_and_deletions(env, init=False, pageserver=env.pageserver) @@ -549,6 +557,7 @@ def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): # The pageserver should work fine when subsequently restarted in non-emergency mode env.pageserver.stop() # Non-immediate: implicitly checking that shutdown doesn't hang waiting for CP + env.pageserver.patch_config_toml_nonrecursive(replaced) env.pageserver.start() generate_uploads_and_deletions(env, init=False, pageserver=env.pageserver) diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index fdcb4cf9a4..bdd356388f 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -290,9 +290,12 @@ def test_storage_controller_onboarding(neon_env_builder: NeonEnvBuilder, warm_up # This is the pageserver where we'll initially create the tenant. Run it in emergency # mode so that it doesn't talk to storage controller, and do not register it. env.pageservers[0].allowed_errors.append(".*Emergency mode!.*") - env.pageservers[0].start( - overrides=("--pageserver-config-override=control_plane_emergency_mode=true",), + env.pageservers[0].patch_config_toml_nonrecursive( + { + "control_plane_emergency_mode": True, + } ) + env.pageservers[0].start() origin_ps = env.pageservers[0] # These are the pageservers managed by the sharding service, where the tenant