diff --git a/compute_tools/README.md b/compute_tools/README.md index 446b441c18..e92e5920b9 100644 --- a/compute_tools/README.md +++ b/compute_tools/README.md @@ -54,11 +54,11 @@ stateDiagram-v2 Running --> TerminationPendingImmediate : Requested termination Running --> ConfigurationPending : Received a /configure request with spec Running --> RefreshConfigurationPending : Received a /refresh_configuration request, compute node will pull a new spec and reconfigure - RefreshConfigurationPending --> Running : Compute has been re-configured + RefreshConfigurationPending --> RefreshConfiguration: Received compute spec and started configuration + RefreshConfiguration --> Running : Compute has been re-configured + RefreshConfiguration --> RefreshConfigurationPending : Configuration failed and to be retried TerminationPendingFast --> Terminated compute with 30s delay for cplane to inspect status TerminationPendingImmediate --> Terminated : Terminated compute immediately - Running --> TerminationPending : Requested termination - TerminationPending --> Terminated : Terminated compute Failed --> RefreshConfigurationPending : Received a /refresh_configuration request Failed --> [*] : Compute exited Terminated --> [*] : Compute exited diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index ef7bca51b2..a240e69df8 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -1994,6 +1994,7 @@ impl ComputeNode { // wait ComputeStatus::Init | ComputeStatus::Configuration + | ComputeStatus::RefreshConfiguration | ComputeStatus::RefreshConfigurationPending | ComputeStatus::Empty => { state = self.state_changed.wait(state).unwrap(); diff --git a/compute_tools/src/configurator.rs b/compute_tools/src/configurator.rs index 93900b5c2b..feca8337b2 100644 --- a/compute_tools/src/configurator.rs +++ b/compute_tools/src/configurator.rs @@ -2,6 +2,7 @@ use std::fs::File; use std::thread; use std::{path::Path, sync::Arc}; +use anyhow::Result; use compute_api::responses::{ComputeConfig, ComputeStatus}; use tracing::{error, info, instrument}; @@ -13,6 +14,10 @@ fn configurator_main_loop(compute: &Arc) { info!("waiting for reconfiguration requests"); loop { let mut state = compute.state.lock().unwrap(); + /* BEGIN_HADRON */ + // RefreshConfiguration should only be used inside the loop + assert_ne!(state.status, ComputeStatus::RefreshConfiguration); + /* END_HADRON */ if compute.params.lakebase_mode { while state.status != ComputeStatus::ConfigurationPending @@ -54,53 +59,68 @@ fn configurator_main_loop(compute: &Arc) { info!( "compute node suspects its configuration is out of date, now refreshing configuration" ); - // Drop the lock guard here to avoid holding the lock while downloading spec from the control plane / HCC. - // This is the only thread that can move compute_ctl out of the `RefreshConfigurationPending` state, so it + state.set_status(ComputeStatus::RefreshConfiguration, &compute.state_changed); + // Drop the lock guard here to avoid holding the lock while downloading config from the control plane / HCC. + // This is the only thread that can move compute_ctl out of the `RefreshConfiguration` state, so it // is safe to drop the lock like this. drop(state); - let spec = if let Some(config_path) = &compute.params.config_path_test_only { - // This path is only to make testing easier. In production we always get the spec from the HCC. - info!( - "reloading config.json from path: {}", - config_path.to_string_lossy() - ); - let path = Path::new(config_path); - if let Ok(file) = File::open(path) { - match serde_json::from_reader::(file) { - Ok(config) => config.spec, - Err(e) => { - error!("could not parse spec file: {}", e); - None - } - } - } else { - error!( - "could not open config file at path: {}", + let get_config_result: anyhow::Result = + if let Some(config_path) = &compute.params.config_path_test_only { + // This path is only to make testing easier. In production we always get the config from the HCC. + info!( + "reloading config.json from path: {}", config_path.to_string_lossy() ); - None - } - } else if let Some(control_plane_uri) = &compute.params.control_plane_uri { - match get_config_from_control_plane(control_plane_uri, &compute.params.compute_id) { - Ok(config) => config.spec, - Err(e) => { - error!("could not get config from control plane: {}", e); - None + let path = Path::new(config_path); + if let Ok(file) = File::open(path) { + match serde_json::from_reader::(file) { + Ok(config) => Ok(config), + Err(e) => { + error!("could not parse config file: {}", e); + Err(anyhow::anyhow!("could not parse config file: {}", e)) + } + } + } else { + error!( + "could not open config file at path: {:?}", + config_path.to_string_lossy() + ); + Err(anyhow::anyhow!( + "could not open config file at path: {}", + config_path.to_string_lossy() + )) } - } - } else { - None - }; + } else if let Some(control_plane_uri) = &compute.params.control_plane_uri { + get_config_from_control_plane(control_plane_uri, &compute.params.compute_id) + } else { + Err(anyhow::anyhow!("config_path_test_only is not set")) + }; - if let Some(spec) = spec { - if let Ok(pspec) = ParsedSpec::try_from(spec) { + // Parse any received ComputeSpec and transpose the result into a Result>. + let parsed_spec_result: Result> = + get_config_result.and_then(|config| { + if let Some(spec) = config.spec { + if let Ok(pspec) = ParsedSpec::try_from(spec) { + Ok(Some(pspec)) + } else { + Err(anyhow::anyhow!("could not parse spec")) + } + } else { + Ok(None) + } + }); + + let new_status: ComputeStatus; + match parsed_spec_result { + // Control plane (HCM) returned a spec and we were able to parse it. + Ok(Some(pspec)) => { { let mut state = compute.state.lock().unwrap(); // Defensive programming to make sure this thread is indeed the only one that can move the compute - // node out of the `RefreshConfigurationPending` state. Would be nice if we can encode this invariant + // node out of the `RefreshConfiguration` state. Would be nice if we can encode this invariant // into the type system. - assert_eq!(state.status, ComputeStatus::RefreshConfigurationPending); + assert_eq!(state.status, ComputeStatus::RefreshConfiguration); if state.pspec.as_ref().map(|ps| ps.pageserver_connstr.clone()) == Some(pspec.pageserver_connstr.clone()) @@ -123,20 +143,45 @@ fn configurator_main_loop(compute: &Arc) { match compute.reconfigure() { Ok(_) => { info!("Refresh configuration: compute node configured"); - compute.set_status(ComputeStatus::Running); + new_status = ComputeStatus::Running; } Err(e) => { error!( "Refresh configuration: could not configure compute node: {}", e ); - // Leave the compute node in the `RefreshConfigurationPending` state if the configuration + // Set the compute node back to the `RefreshConfigurationPending` state if the configuration // was not successful. It should be okay to treat this situation the same as if the loop // hasn't executed yet as long as the detection side keeps notifying. + new_status = ComputeStatus::RefreshConfigurationPending; } } } + // Control plane (HCM)'s response does not contain a spec. This is the "Empty" attachment case. + Ok(None) => { + info!( + "Compute Manager signaled that this compute is no longer attached to any storage. Exiting." + ); + // We just immediately terminate the whole compute_ctl in this case. It's not necessary to attempt a + // clean shutdown as Postgres is probably not responding anyway (which is why we are in this refresh + // configuration state). + std::process::exit(1); + } + // Various error cases: + // - The request to the control plane (HCM) either failed or returned a malformed spec. + // - compute_ctl itself is configured incorrectly (e.g., compute_id is not set). + Err(e) => { + error!( + "Refresh configuration: error getting a parsed spec: {:?}", + e + ); + new_status = ComputeStatus::RefreshConfigurationPending; + // We may be dealing with an overloaded HCM if we end up in this path. Backoff 5 seconds before + // retrying to avoid hammering the HCM. + std::thread::sleep(std::time::Duration::from_secs(5)); + } } + compute.set_status(new_status); } else if state.status == ComputeStatus::Failed { info!("compute node is now in Failed state, exiting"); break; diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 20dcf85562..6221d83c7f 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -938,7 +938,8 @@ impl Endpoint { | ComputeStatus::TerminationPendingFast | ComputeStatus::TerminationPendingImmediate | ComputeStatus::Terminated - | ComputeStatus::RefreshConfigurationPending => { + | ComputeStatus::RefreshConfigurationPending + | ComputeStatus::RefreshConfiguration => { bail!("unexpected compute status: {:?}", state.status) } } diff --git a/libs/compute_api/src/responses.rs b/libs/compute_api/src/responses.rs index 7efd94c76a..a27301e45e 100644 --- a/libs/compute_api/src/responses.rs +++ b/libs/compute_api/src/responses.rs @@ -174,6 +174,9 @@ pub enum ComputeStatus { Terminated, // A spec refresh is being requested RefreshConfigurationPending, + // A spec refresh is being applied. We cannot refresh configuration again until the current + // refresh is done, i.e., signal_refresh_configuration() will return 500 error. + RefreshConfiguration, } #[derive(Deserialize, Serialize)] @@ -186,6 +189,10 @@ impl Display for ComputeStatus { match self { ComputeStatus::Empty => f.write_str("empty"), ComputeStatus::ConfigurationPending => f.write_str("configuration-pending"), + ComputeStatus::RefreshConfiguration => f.write_str("refresh-configuration"), + ComputeStatus::RefreshConfigurationPending => { + f.write_str("refresh-configuration-pending") + } ComputeStatus::Init => f.write_str("init"), ComputeStatus::Running => f.write_str("running"), ComputeStatus::Configuration => f.write_str("configuration"), @@ -195,9 +202,6 @@ impl Display for ComputeStatus { f.write_str("termination-pending-immediate") } ComputeStatus::Terminated => f.write_str("terminated"), - ComputeStatus::RefreshConfigurationPending => { - f.write_str("refresh-configuration-pending") - } } } } diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index ab0736e180..1031f185a6 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -89,6 +89,8 @@ static int pageserver_response_log_timeout = 10000; static int pageserver_response_disconnect_timeout = 150000; static int conf_refresh_reconnect_attempt_threshold = 16; +// Hadron: timeout for refresh errors (1 minute) +static uint64 kRefreshErrorTimeoutUSec = 1 * USECS_PER_MINUTE; typedef struct { @@ -1046,14 +1048,22 @@ pageserver_disconnect_shard(shardno_t shard_no) extern int hadron_extension_server_port; -static void +// The timestamp (usec) of the first error that occurred while trying to refresh the configuration. +// Will be reset to 0 after a successful refresh. +static uint64 first_recorded_refresh_error_usec = 0; + +// Request compute_ctl to refresh the configuration. This operation may fail, e.g., if the compute_ctl +// is already in the configuration state. The function returns true if the caller needs to cancel the +// current query to avoid dead/live lock. +static bool hadron_request_configuration_refresh() { static CURL *handle = NULL; CURLcode res; char *compute_ctl_url; + bool cancel_query = false; if (!lakebase_mode) - return; + return false; if (handle == NULL) { @@ -1073,9 +1083,40 @@ hadron_request_configuration_refresh() { curl_easy_setopt(handle, CURLOPT_URL, compute_ctl_url); res = curl_easy_perform(handle); - if (res != CURLE_OK) + if (res != CURLE_OK ) { - elog(WARNING, "compute_ctl refresh_configuration request failed: %s\n", curl_easy_strerror(res)); + elog(WARNING, "refresh_configuration request failed: %s\n", curl_easy_strerror(res)); + } + else + { + long http_code = 0; + curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &http_code); + if ( res != CURLE_OK ) + { + elog(WARNING, "compute_ctl refresh_configuration request getinfo failed: %s\n", curl_easy_strerror(res)); + } + else + { + elog(LOG, "compute_ctl refresh_configuration got HTTP response: %ld\n", http_code); + if( http_code == 200 ) + { + first_recorded_refresh_error_usec = 0; + } + else + { + if (first_recorded_refresh_error_usec == 0) + { + first_recorded_refresh_error_usec = GetCurrentTimestamp(); + } + else if(GetCurrentTimestamp() - first_recorded_refresh_error_usec > kRefreshErrorTimeoutUSec) + { + { + first_recorded_refresh_error_usec = 0; + cancel_query = true; + } + } + } + } } // In regular Postgres usage, it is not necessary to manually free memory allocated by palloc (psprintf) because @@ -1086,6 +1127,7 @@ hadron_request_configuration_refresh() { { pfree(compute_ctl_url); } + return cancel_query; } // END HADRON @@ -1123,8 +1165,10 @@ pageserver_send(shardno_t shard_no, NeonRequest *request) while (!pageserver_connect(shard_no, shard->n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR)) { shard->n_reconnect_attempts += 1; - if (shard->n_reconnect_attempts > conf_refresh_reconnect_attempt_threshold) { - hadron_request_configuration_refresh(); + if (shard->n_reconnect_attempts > conf_refresh_reconnect_attempt_threshold + && hadron_request_configuration_refresh() ) + { + neon_shard_log(shard_no, ERROR, "request failed too many times, cancelling query"); } } shard->n_reconnect_attempts = 0; @@ -1338,6 +1382,16 @@ pageserver_try_receive(shardno_t shard_no) neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect: unexpected PQgetCopyData return value: %d", rc); } + /* + * Always poke compute_ctl to request a configuration refresh if we have issues receiving data from pageservers after + * successfully connecting to it. It could be an indication that we are connecting to the wrong pageservers (e.g. PS + * is in secondary mode or otherwise refuses to respond our request). + */ + if ( rc < 0 && hadron_request_configuration_refresh() ) + { + neon_shard_log(shard_no, ERROR, "refresh_configuration request failed, cancelling query"); + } + shard->nresponses_received++; return (NeonResponse *) resp; } diff --git a/test_runner/regress/test_compute_termination.py b/test_runner/regress/test_compute_termination.py new file mode 100644 index 0000000000..2d62ccf20f --- /dev/null +++ b/test_runner/regress/test_compute_termination.py @@ -0,0 +1,369 @@ +from __future__ import annotations + +import json +import os +import shutil +import subprocess +import threading +import time +from http.server import BaseHTTPRequestHandler, HTTPServer +from typing import TYPE_CHECKING + +import requests +from fixtures.log_helper import log +from typing_extensions import override + +if TYPE_CHECKING: + from typing import Any + + from fixtures.common_types import TenantId, TimelineId + from fixtures.neon_fixtures import NeonEnv + from fixtures.port_distributor import PortDistributor + + +def launch_compute_ctl( + env: NeonEnv, + endpoint_name: str, + external_http_port: int, + internal_http_port: int, + pg_port: int, + control_plane_port: int, +) -> subprocess.Popen[str]: + """ + Helper function to launch compute_ctl process with common configuration. + Returns the Popen process object. + """ + # Create endpoint directory structure following the standard pattern + endpoint_path = env.repo_dir / "endpoints" / endpoint_name + + # Clean up any existing endpoint directory to avoid conflicts + if endpoint_path.exists(): + shutil.rmtree(endpoint_path) + + endpoint_path.mkdir(mode=0o755, parents=True, exist_ok=True) + + # pgdata path - compute_ctl will create this directory during basebackup + pgdata_path = endpoint_path / "pgdata" + + # Create log file in endpoint directory + log_file = endpoint_path / "compute.log" + log_handle = open(log_file, "w") + + # Start compute_ctl pointing to our control plane + compute_ctl_path = env.neon_binpath / "compute_ctl" + connstr = f"postgresql://cloud_admin@localhost:{pg_port}/postgres" + + # Find postgres binary path + pg_bin_path = env.pg_distrib_dir / env.pg_version.v_prefixed / "bin" / "postgres" + pg_lib_path = env.pg_distrib_dir / env.pg_version.v_prefixed / "lib" + + env_vars = { + "INSTANCE_ID": "lakebase-instance-id", + "LD_LIBRARY_PATH": str(pg_lib_path), # Linux, etc. + "DYLD_LIBRARY_PATH": str(pg_lib_path), # macOS + } + + cmd = [ + str(compute_ctl_path), + "--external-http-port", + str(external_http_port), + "--internal-http-port", + str(internal_http_port), + "--pgdata", + str(pgdata_path), + "--connstr", + connstr, + "--pgbin", + str(pg_bin_path), + "--compute-id", + endpoint_name, # Use endpoint_name as compute-id + "--control-plane-uri", + f"http://127.0.0.1:{control_plane_port}", + "--lakebase-mode", + "true", + ] + + print(f"Launching compute_ctl with command: {cmd}") + + # Start compute_ctl + process = subprocess.Popen( + cmd, + env=env_vars, + stdout=log_handle, + stderr=subprocess.STDOUT, # Combine stderr with stdout + text=True, + ) + + return process + + +def wait_for_compute_status( + compute_process: subprocess.Popen[str], + http_port: int, + expected_status: str, + timeout_seconds: int = 10, +) -> None: + """ + Wait for compute_ctl to reach the expected status. + Raises an exception if timeout is reached or process exits unexpectedly. + """ + start_time = time.time() + while time.time() - start_time < timeout_seconds: + try: + # Try to connect to the HTTP endpoint + response = requests.get(f"http://localhost:{http_port}/status", timeout=0.5) + if response.status_code == 200: + status_json = response.json() + # Check if it's in expected status + if status_json.get("status") == expected_status: + return + except (requests.ConnectionError, requests.Timeout): + pass + + # Check if process has exited + if compute_process.poll() is not None: + raise Exception( + f"compute_ctl exited unexpectedly with code {compute_process.returncode}." + ) + + time.sleep(0.5) + + # Timeout reached + compute_process.terminate() + raise Exception( + f"compute_ctl failed to reach {expected_status} status within {timeout_seconds} seconds." + ) + + +class EmptySpecHandler(BaseHTTPRequestHandler): + """HTTP handler that returns an Empty compute spec response""" + + def do_GET(self): + if self.path.startswith("/compute/api/v2/computes/") and self.path.endswith("/spec"): + # Return empty status which will put compute in Empty state + response: dict[str, Any] = { + "status": "empty", + "spec": None, + "compute_ctl_config": {"jwks": {"keys": []}}, + } + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(json.dumps(response).encode()) + else: + self.send_error(404) + + @override + def log_message(self, format: str, *args: Any): + # Suppress request logging + pass + + +def test_compute_terminate_empty(neon_simple_env: NeonEnv, port_distributor: PortDistributor): + """ + Test that terminating a compute in Empty status works correctly. + + This tests the bug fix where terminating an Empty compute would hang + waiting for a non-existent postgres process to terminate. + """ + env = neon_simple_env + + # Get ports for our test + control_plane_port = port_distributor.get_port() + external_http_port = port_distributor.get_port() + internal_http_port = port_distributor.get_port() + pg_port = port_distributor.get_port() + + # Start a simple HTTP server that will serve the Empty spec + server = HTTPServer(("127.0.0.1", control_plane_port), EmptySpecHandler) + server_thread = threading.Thread(target=server.serve_forever) + server_thread.daemon = True + server_thread.start() + + compute_process = None + try: + # Start compute_ctl with ephemeral tenant ID + compute_process = launch_compute_ctl( + env, + "test-empty-compute", + external_http_port, + internal_http_port, + pg_port, + control_plane_port, + ) + + # Wait for compute_ctl to start and report "empty" status + wait_for_compute_status(compute_process, external_http_port, "empty") + + # Now send terminate request + response = requests.post(f"http://localhost:{external_http_port}/terminate") + + # Verify that the termination request sends back a 200 OK response and is not abruptly terminated. + assert response.status_code == 200, ( + f"Expected 200 OK, got {response.status_code}: {response.text}" + ) + + # Wait for compute_ctl to exit + exit_code = compute_process.wait(timeout=10) + assert exit_code == 0, f"compute_ctl exited with non-zero code: {exit_code}" + + finally: + # Clean up + server.shutdown() + if compute_process and compute_process.poll() is None: + compute_process.terminate() + compute_process.wait() + + +class SwitchableConfigHandler(BaseHTTPRequestHandler): + """HTTP handler that can switch between normal compute configs and compute configs without specs""" + + return_empty_spec: bool = False + tenant_id: TenantId | None = None + timeline_id: TimelineId | None = None + pageserver_port: int | None = None + safekeeper_connstrs: list[str] | None = None + + def do_GET(self): + if self.path.startswith("/compute/api/v2/computes/") and self.path.endswith("/spec"): + if self.return_empty_spec: + # Return empty status + response: dict[str, object | None] = { + "status": "empty", + "spec": None, + "compute_ctl_config": { + "jwks": {"keys": []}, + }, + } + else: + # Return normal attached spec + response = { + "status": "attached", + "spec": { + "format_version": 1.0, + "cluster": { + "roles": [], + "databases": [], + "postgresql_conf": "shared_preload_libraries='neon'", + }, + "tenant_id": str(self.tenant_id) if self.tenant_id else "", + "timeline_id": str(self.timeline_id) if self.timeline_id else "", + "pageserver_connstring": f"postgres://no_user@localhost:{self.pageserver_port}" + if self.pageserver_port + else "", + "safekeeper_connstrings": self.safekeeper_connstrs or [], + "mode": "Primary", + "skip_pg_catalog_updates": True, + "reconfigure_concurrency": 1, + "suspend_timeout_seconds": -1, + }, + "compute_ctl_config": { + "jwks": {"keys": []}, + }, + } + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(json.dumps(response).encode()) + else: + self.send_error(404) + + @override + def log_message(self, format: str, *args: Any): + # Suppress request logging + pass + + +def test_compute_empty_spec_during_refresh_configuration( + neon_simple_env: NeonEnv, port_distributor: PortDistributor +): + """ + Test that compute exits when it receives an empty spec during refresh configuration state. + + This test: + 1. Start compute with a normal spec + 2. Change the spec handler to return empty spec + 3. Trigger some condition to force compute to refresh configuration + 4. Verify that compute_ctl exits + """ + env = neon_simple_env + + # Get ports for our test + control_plane_port = port_distributor.get_port() + external_http_port = port_distributor.get_port() + internal_http_port = port_distributor.get_port() + pg_port = port_distributor.get_port() + + # Set up handler class variables + SwitchableConfigHandler.tenant_id = env.initial_tenant + SwitchableConfigHandler.timeline_id = env.initial_timeline + SwitchableConfigHandler.pageserver_port = env.pageserver.service_port.pg + # Convert comma-separated string to list + safekeeper_connstrs = env.get_safekeeper_connstrs() + if safekeeper_connstrs: + SwitchableConfigHandler.safekeeper_connstrs = safekeeper_connstrs.split(",") + else: + SwitchableConfigHandler.safekeeper_connstrs = [] + SwitchableConfigHandler.return_empty_spec = False # Start with normal spec + + # Start HTTP server with switchable spec handler + server = HTTPServer(("127.0.0.1", control_plane_port), SwitchableConfigHandler) + server_thread = threading.Thread(target=server.serve_forever) + server_thread.daemon = True + server_thread.start() + + compute_process = None + try: + # Start compute_ctl with tenant and timeline IDs + # Use a unique endpoint name to avoid conflicts + endpoint_name = f"test-refresh-compute-{os.getpid()}" + compute_process = launch_compute_ctl( + env, + endpoint_name, + external_http_port, + internal_http_port, + pg_port, + control_plane_port, + ) + + # Wait for compute_ctl to start and report "running" status + wait_for_compute_status(compute_process, external_http_port, "running", timeout_seconds=30) + + log.info("Compute is running. Now returning empty spec and trigger configuration refresh.") + + # Switch spec fetch handler to return empty spec + SwitchableConfigHandler.return_empty_spec = True + + # Trigger a configuration refresh + try: + requests.post(f"http://localhost:{internal_http_port}/refresh_configuration") + except requests.RequestException as e: + log.info(f"Call to /refresh_configuration failed: {e}") + log.info( + "Ignoring the error, assuming that compute_ctl is already refreshing or has exited" + ) + + # Wait for compute_ctl to exit (it should exit when it gets an empty spec during refresh) + exit_start_time = time.time() + while time.time() - exit_start_time < 30: + if compute_process.poll() is not None: + # Process exited + break + time.sleep(0.5) + + # Verify that compute_ctl exited + exit_code = compute_process.poll() + if exit_code is None: + compute_process.terminate() + raise Exception("compute_ctl did not exit after receiving empty spec.") + + # The exit code might not be 0 in this case since it's an unexpected termination + # but we mainly care that it did exit + assert exit_code is not None, "compute_ctl should have exited" + + finally: + # Clean up + server.shutdown() + if compute_process and compute_process.poll() is None: + compute_process.terminate() + compute_process.wait() diff --git a/test_runner/regress/test_hadron_ps_connectivity_metrics.py b/test_runner/regress/test_hadron_ps_connectivity_metrics.py index 7590c1236c..ff1f37b634 100644 --- a/test_runner/regress/test_hadron_ps_connectivity_metrics.py +++ b/test_runner/regress/test_hadron_ps_connectivity_metrics.py @@ -5,6 +5,7 @@ from fixtures.common_types import TenantShardId from fixtures.log_helper import log from fixtures.metrics import parse_metrics from fixtures.neon_fixtures import Endpoint, NeonEnvBuilder, NeonPageserver +from requests.exceptions import ConnectionError # Helper function to attempt reconfiguration of the compute to point to a new pageserver. Note that in these tests, @@ -75,8 +76,14 @@ def test_misrouted_to_secondary( assert read_misrouted_metric_value(secondary_ps) == 0 assert read_request_error_metric_value(endpoint) == 0 _attempt_reconfiguration(endpoint, new_pageserver_id=secondary_ps.id, timeout_sec=2.0) - assert read_misrouted_metric_value(secondary_ps) > 0, "PS metric not incremented" - assert read_request_error_metric_value(endpoint) > 0, "compute_ctl metric not incremented" + assert read_misrouted_metric_value(secondary_ps) > 0 + try: + assert read_request_error_metric_value(endpoint) > 0 + except ConnectionError: + # When configuring PG to use misconfigured pageserver, PG will cancel the query after certain number of failed + # reconfigure attempts. This will cause compute_ctl to exit. + log.info("Cannot connect to PG, ignoring") + pass def test_misrouted_to_ps_not_hosting_tenant( @@ -120,5 +127,11 @@ def test_misrouted_to_ps_not_hosting_tenant( assert read_misrouted_metric_value(non_hosting_ps) == 0 assert read_request_error_metric_value(endpoint) == 0 _attempt_reconfiguration(endpoint, new_pageserver_id=non_hosting_ps.id, timeout_sec=2.0) - assert read_misrouted_metric_value(non_hosting_ps) > 0, "PS metric not incremented" - assert read_request_error_metric_value(endpoint) > 0, "compute_ctl metric not incremented" + assert read_misrouted_metric_value(non_hosting_ps) > 0 + try: + assert read_request_error_metric_value(endpoint) > 0 + except ConnectionError: + # When configuring PG to use misconfigured pageserver, PG will cancel the query after certain number of failed + # reconfigure attempts. This will cause compute_ctl to exit. + log.info("Cannot connect to PG, ignoring") + pass