diff --git a/control_plane/src/background_process.rs b/control_plane/src/background_process.rs index 8909e27c94..1f3f8f45ea 100644 --- a/control_plane/src/background_process.rs +++ b/control_plane/src/background_process.rs @@ -136,22 +136,6 @@ where anyhow::bail!("{process_name} did not start in {RETRY_UNTIL_SECS} seconds"); } -/// Send SIGTERM to child process -pub fn send_stop_child_process(child: &std::process::Child) -> anyhow::Result<()> { - let pid = child.id(); - match kill( - nix::unistd::Pid::from_raw(pid.try_into().unwrap()), - Signal::SIGTERM, - ) { - Ok(()) => Ok(()), - Err(Errno::ESRCH) => { - println!("child process with pid {pid} does not exist"); - Ok(()) - } - Err(e) => anyhow::bail!("Failed to send signal to child process with pid {pid}: {e}"), - } -} - /// Stops the process, using the pid file given. Returns Ok also if the process is already not running. pub fn stop_process(immediate: bool, process_name: &str, pid_file: &Path) -> anyhow::Result<()> { let pid = match pid_file::read(pid_file) diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 71de741640..e4d0680c9e 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -284,8 +284,6 @@ fn parse_timeline_id(sub_match: &ArgMatches) -> anyhow::Result anyhow::Result { - let initial_timeline_id_arg = parse_timeline_id(init_match)?; - // Create config file let toml_file: String = if let Some(config_path) = init_match.get_one::("config") { // load and parse the file @@ -309,30 +307,16 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result { LocalEnv::parse_config(&toml_file).context("Failed to create neon configuration")?; env.init(pg_version) .context("Failed to initialize neon repository")?; - let initial_tenant_id = env - .default_tenant_id - .expect("default_tenant_id should be generated by the `env.init()` call above"); // Initialize pageserver, create initial tenant and timeline. let pageserver = PageServerNode::from_env(&env); - let initial_timeline_id = pageserver - .initialize( - Some(initial_tenant_id), - initial_timeline_id_arg, - &pageserver_config_overrides(init_match), - pg_version, - ) + pageserver + .initialize(&pageserver_config_overrides(init_match)) .unwrap_or_else(|e| { eprintln!("pageserver init failed: {e:?}"); exit(1); }); - env.register_branch_mapping( - DEFAULT_BRANCH_NAME.to_owned(), - initial_tenant_id, - initial_timeline_id, - )?; - Ok(env) } @@ -928,9 +912,8 @@ fn cli() -> Command { .version(GIT_VERSION) .subcommand( Command::new("init") - .about("Initialize a new Neon repository") + .about("Initialize a new Neon repository, preparing configs for services to start with") .arg(pageserver_config_args.clone()) - .arg(timeline_id_arg.clone().help("Use a specific timeline id when creating a tenant and its initial timeline")) .arg( Arg::new("config") .long("config") diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 68e94b2fdc..9cebe028e4 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -7,7 +7,7 @@ use std::path::PathBuf; use std::process::{Child, Command}; use std::{io, result}; -use anyhow::{bail, ensure, Context}; +use anyhow::{bail, Context}; use pageserver_api::models::{ TenantConfigRequest, TenantCreateRequest, TenantInfo, TimelineCreateRequest, TimelineInfo, }; @@ -130,83 +130,15 @@ impl PageServerNode { overrides } - /// Initializes a pageserver node by creating its config with the overrides provided, - /// and creating an initial tenant and timeline afterwards. - pub fn initialize( - &self, - create_tenant: Option, - initial_timeline_id: Option, - config_overrides: &[&str], - pg_version: u32, - ) -> anyhow::Result { + /// Initializes a pageserver node by creating its config with the overrides provided. + pub fn initialize(&self, config_overrides: &[&str]) -> anyhow::Result<()> { // First, run `pageserver --init` and wait for it to write a config into FS and exit. self.pageserver_init(config_overrides).with_context(|| { format!( "Failed to run init for pageserver node {}", self.env.pageserver.id, ) - })?; - - // Then, briefly start it fully to run HTTP commands on it, - // to create initial tenant and timeline. - // We disable the remote storage, since we stop pageserver right after the timeline creation, - // hence most of the uploads will either aborted or not started: no point to start them at all. - let disabled_remote_storage_override = "remote_storage={}"; - let mut pageserver_process = self - .start_node( - &[disabled_remote_storage_override], - // Previous overrides will be taken from the config created before, don't overwrite them. - false, - ) - .with_context(|| { - format!( - "Failed to start a process for pageserver node {}", - self.env.pageserver.id, - ) - })?; - - let init_result = self - .try_init_timeline(create_tenant, initial_timeline_id, pg_version) - .context("Failed to create initial tenant and timeline for pageserver"); - match &init_result { - Ok(initial_timeline_id) => { - println!("Successfully initialized timeline {initial_timeline_id}") - } - Err(e) => eprintln!("{e:#}"), - } - background_process::send_stop_child_process(&pageserver_process)?; - - let exit_code = pageserver_process.wait()?; - ensure!( - exit_code.success(), - format!( - "pageserver init failed with exit code {:?}", - exit_code.code() - ) - ); - println!( - "Stopped pageserver {} process with pid {}", - self.env.pageserver.id, - pageserver_process.id(), - ); - init_result - } - - fn try_init_timeline( - &self, - new_tenant_id: Option, - new_timeline_id: Option, - pg_version: u32, - ) -> anyhow::Result { - let initial_tenant_id = self.tenant_create(new_tenant_id, HashMap::new())?; - let initial_timeline_info = self.timeline_create( - initial_tenant_id, - new_timeline_id, - None, - None, - Some(pg_version), - )?; - Ok(initial_timeline_info.timeline_id) + }) } pub fn repo_path(&self) -> PathBuf { diff --git a/libs/utils/src/postgres_backend_async.rs b/libs/utils/src/postgres_backend_async.rs index a4f523da04..95b7b3fd15 100644 --- a/libs/utils/src/postgres_backend_async.rs +++ b/libs/utils/src/postgres_backend_async.rs @@ -20,7 +20,10 @@ use tokio_rustls::TlsAcceptor; pub fn is_expected_io_error(e: &io::Error) -> bool { use io::ErrorKind::*; - matches!(e.kind(), ConnectionRefused | ConnectionAborted) + matches!( + e.kind(), + ConnectionRefused | ConnectionAborted | ConnectionReset + ) } /// An error, occurred during query processing: diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 481f46ff55..97bc694543 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -596,6 +596,7 @@ class NeonEnvBuilder: rust_log_override: Optional[str] = None, default_branch_name: str = DEFAULT_BRANCH_NAME, preserve_database_files: bool = False, + initial_tenant: Optional[TenantId] = None, ): self.repo_dir = repo_dir self.rust_log_override = rust_log_override @@ -618,8 +619,9 @@ class NeonEnvBuilder: self.pg_distrib_dir = pg_distrib_dir self.pg_version = pg_version self.preserve_database_files = preserve_database_files + self.initial_tenant = initial_tenant or TenantId.generate() - def init(self) -> NeonEnv: + def init_configs(self) -> NeonEnv: # Cannot create more than one environment from one builder assert self.env is None, "environment already initialized" self.env = NeonEnv(self) @@ -630,8 +632,17 @@ class NeonEnvBuilder: self.env.start() def init_start(self) -> NeonEnv: - env = self.init() + env = self.init_configs() self.start() + + # Prepare the default branch to start the postgres on later. + # Pageserver itself does not create tenants and timelines, until started first and asked via HTTP API. + log.info( + f"Services started, creating initial tenant {env.initial_tenant} and its initial timeline" + ) + initial_tenant, initial_timeline = env.neon_cli.create_tenant(tenant_id=env.initial_tenant) + log.info(f"Initial timeline {initial_tenant}/{initial_timeline} created successfully") + return env def enable_remote_storage( @@ -890,12 +901,12 @@ class NeonEnv: # generate initial tenant ID here instead of letting 'neon init' generate it, # so that we don't need to dig it out of the config file afterwards. - self.initial_tenant = TenantId.generate() + self.initial_tenant = config.initial_tenant # Create a config file corresponding to the options toml = textwrap.dedent( f""" - default_tenant_id = '{self.initial_tenant}' + default_tenant_id = '{config.initial_tenant}' """ ) @@ -1724,17 +1735,12 @@ class NeonCli(AbstractNeonCli): def init( self, config_toml: str, - initial_timeline_id: Optional[TimelineId] = None, ) -> "subprocess.CompletedProcess[str]": with tempfile.NamedTemporaryFile(mode="w+") as tmp: tmp.write(config_toml) tmp.flush() - cmd = ["init", f"--config={tmp.name}"] - if initial_timeline_id: - cmd.extend(["--timeline-id", str(initial_timeline_id)]) - - cmd.extend(["--pg-version", self.env.pg_version]) + cmd = ["init", f"--config={tmp.name}", "--pg-version", self.env.pg_version] append_pageserver_param_overrides( params_to_update=cmd, diff --git a/test_runner/regress/test_metric_collection.py b/test_runner/regress/test_metric_collection.py index 0fff86f268..d1fcab7a62 100644 --- a/test_runner/regress/test_metric_collection.py +++ b/test_runner/regress/test_metric_collection.py @@ -1,3 +1,5 @@ +import time + import pytest from fixtures.log_helper import log from fixtures.metrics import parse_metrics @@ -20,9 +22,19 @@ def httpserver_listen_address(port_distributor: PortDistributor): return ("localhost", port) -num_metrics_received = 0 +initial_tenant = TenantId.generate() remote_uploaded = 0 -first_request = True +checks = { + "written_size": lambda value: value > 0, + "resident_size": lambda value: value >= 0, + # >= 0 check here is to avoid race condition when we receive metrics before + # remote_uploaded is updated + "remote_storage_size": lambda value: value > 0 if remote_uploaded > 0 else value >= 0, + # logical size may lag behind the actual size, so allow 0 here + "timeline_logical_size": lambda value: value >= 0, +} + +metric_kinds_checked = set([]) # @@ -36,38 +48,19 @@ def metrics_handler(request: Request) -> Response: log.info("received events:") log.info(events) - checks = { - "written_size": lambda value: value > 0, - "resident_size": lambda value: value >= 0, - # >= 0 check here is to avoid race condition when we receive metrics before - # remote_uploaded is updated - "remote_storage_size": lambda value: value > 0 if remote_uploaded > 0 else value >= 0, - # logical size may lag behind the actual size, so allow 0 here - "timeline_logical_size": lambda value: value >= 0, - } - - events_received = 0 for event in events: - check = checks.get(event["metric"]) + assert event["tenant_id"] == str( + initial_tenant + ), "Expecting metrics only from the initial tenant" + metric_name = event["metric"] + + check = checks.get(metric_name) # calm down mypy if check is not None: - assert check(event["value"]), f"{event['metric']} isn't valid" - events_received += 1 + assert check(event["value"]), f"{metric_name} isn't valid" + global metric_kinds_checked + metric_kinds_checked.add(metric_name) - global first_request - # check that all checks were sent - # but only on the first request, because we don't send non-changed metrics - if first_request: - # we may receive more metrics than we check, - # because there are two timelines - # and we may receive per-timeline metrics from both - # if the test was slow enough for these metrics to be collected - # -1 because that is ok to not receive timeline_logical_size - assert events_received >= len(checks) - 1 - first_request = False - - global num_metrics_received - num_metrics_received += 1 return Response(status=200) @@ -83,11 +76,14 @@ def test_metric_collection( (host, port) = httpserver_listen_address metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events" + # Require collecting metrics frequently, since we change + # the timeline and want something to be logged about it. + # # Disable time-based pitr, we will use the manual GC calls # to trigger remote storage operations in a controlled way neon_env_builder.pageserver_config_override = ( f""" - metric_collection_interval="60s" + metric_collection_interval="1s" metric_collection_endpoint="{metric_collection_endpoint}" """ + "tenant_config={pitr_interval = '0 sec'}" @@ -100,6 +96,9 @@ def test_metric_collection( log.info(f"test_metric_collection endpoint is {metric_collection_endpoint}") + # Set initial tenant of the test, that we expect the logs from + global initial_tenant + initial_tenant = neon_env_builder.initial_tenant # mock http server that returns OK for the metrics httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler( metrics_handler @@ -154,7 +153,11 @@ def test_metric_collection( remote_uploaded = get_num_remote_ops("index", "upload") assert remote_uploaded > 0 - # check that all requests are served + # wait longer than collecting interval and check that all requests are served + time.sleep(3) httpserver.check() - global num_metrics_received - assert num_metrics_received > 0, "no metrics were received" + global metric_kinds_checked, checks + expected_checks = set(checks.keys()) + assert len(metric_kinds_checked) == len( + checks + ), f"Expected to receive and check all kind of metrics, but {expected_checks - metric_kinds_checked} got uncovered" diff --git a/test_runner/regress/test_neon_local_cli.py b/test_runner/regress/test_neon_local_cli.py index 6c7cdb6f7f..e8f01ccf55 100644 --- a/test_runner/regress/test_neon_local_cli.py +++ b/test_runner/regress/test_neon_local_cli.py @@ -4,7 +4,7 @@ from fixtures.neon_fixtures import NeonEnvBuilder # Test that neon cli is able to start and stop all processes with the user defaults. # def test_neon_cli_basics(neon_simple_env: NeonEnv): def test_neon_cli_basics(neon_env_builder: NeonEnvBuilder): - env = neon_env_builder.init() + env = neon_env_builder.init_configs() env.neon_cli.start() env.neon_cli.stop() diff --git a/test_runner/regress/test_recovery.py b/test_runner/regress/test_recovery.py index 1e93958e98..09644eaaa1 100644 --- a/test_runner/regress/test_recovery.py +++ b/test_runner/regress/test_recovery.py @@ -12,11 +12,9 @@ def test_pageserver_recovery(neon_env_builder: NeonEnvBuilder): # Override default checkpointer settings to run it more often neon_env_builder.pageserver_config_override = "tenant_config={checkpoint_distance = 1048576}" - env = neon_env_builder.init() + env = neon_env_builder.init_start() env.pageserver.is_testing_enabled_or_skip() - neon_env_builder.start() - # These warnings are expected, when the pageserver is restarted abruptly env.pageserver.allowed_errors.append(".*found future delta layer.*") env.pageserver.allowed_errors.append(".*found future image layer.*")