mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-22 21:59:59 +00:00
Cancel PG query if stuck at refreshing configuration (#12717)
## Problem While configuring or reconfiguring PG due to PageServer movements, it's possible PG may get stuck if PageServer is moved around after fetching the spec from StorageController. ## Summary of changes To fix this issue, this PR introduces two changes: 1. Fail the PG query directly if the query cannot request configuration for certain number of times. 2. Introduce a new state `RefreshConfiguration` in compute tools to differentiate it from `RefreshConfigurationPending`. If compute tool is already in `RefreshConfiguration` state, then it will not accept new request configuration requests. ## How is this tested? Chaos testing. Co-authored-by: Chen Luo <chen.luo@databricks.com>
This commit is contained in:
@@ -54,11 +54,11 @@ stateDiagram-v2
|
||||
Running --> TerminationPendingImmediate : Requested termination
|
||||
Running --> ConfigurationPending : Received a /configure request with spec
|
||||
Running --> RefreshConfigurationPending : Received a /refresh_configuration request, compute node will pull a new spec and reconfigure
|
||||
RefreshConfigurationPending --> Running : Compute has been re-configured
|
||||
RefreshConfigurationPending --> RefreshConfiguration: Received compute spec and started configuration
|
||||
RefreshConfiguration --> Running : Compute has been re-configured
|
||||
RefreshConfiguration --> RefreshConfigurationPending : Configuration failed and to be retried
|
||||
TerminationPendingFast --> Terminated compute with 30s delay for cplane to inspect status
|
||||
TerminationPendingImmediate --> Terminated : Terminated compute immediately
|
||||
Running --> TerminationPending : Requested termination
|
||||
TerminationPending --> Terminated : Terminated compute
|
||||
Failed --> RefreshConfigurationPending : Received a /refresh_configuration request
|
||||
Failed --> [*] : Compute exited
|
||||
Terminated --> [*] : Compute exited
|
||||
|
||||
@@ -1994,6 +1994,7 @@ impl ComputeNode {
|
||||
// wait
|
||||
ComputeStatus::Init
|
||||
| ComputeStatus::Configuration
|
||||
| ComputeStatus::RefreshConfiguration
|
||||
| ComputeStatus::RefreshConfigurationPending
|
||||
| ComputeStatus::Empty => {
|
||||
state = self.state_changed.wait(state).unwrap();
|
||||
|
||||
@@ -2,6 +2,7 @@ use std::fs::File;
|
||||
use std::thread;
|
||||
use std::{path::Path, sync::Arc};
|
||||
|
||||
use anyhow::Result;
|
||||
use compute_api::responses::{ComputeConfig, ComputeStatus};
|
||||
use tracing::{error, info, instrument};
|
||||
|
||||
@@ -13,6 +14,10 @@ fn configurator_main_loop(compute: &Arc<ComputeNode>) {
|
||||
info!("waiting for reconfiguration requests");
|
||||
loop {
|
||||
let mut state = compute.state.lock().unwrap();
|
||||
/* BEGIN_HADRON */
|
||||
// RefreshConfiguration should only be used inside the loop
|
||||
assert_ne!(state.status, ComputeStatus::RefreshConfiguration);
|
||||
/* END_HADRON */
|
||||
|
||||
if compute.params.lakebase_mode {
|
||||
while state.status != ComputeStatus::ConfigurationPending
|
||||
@@ -54,53 +59,68 @@ fn configurator_main_loop(compute: &Arc<ComputeNode>) {
|
||||
info!(
|
||||
"compute node suspects its configuration is out of date, now refreshing configuration"
|
||||
);
|
||||
// Drop the lock guard here to avoid holding the lock while downloading spec from the control plane / HCC.
|
||||
// This is the only thread that can move compute_ctl out of the `RefreshConfigurationPending` state, so it
|
||||
state.set_status(ComputeStatus::RefreshConfiguration, &compute.state_changed);
|
||||
// Drop the lock guard here to avoid holding the lock while downloading config from the control plane / HCC.
|
||||
// This is the only thread that can move compute_ctl out of the `RefreshConfiguration` state, so it
|
||||
// is safe to drop the lock like this.
|
||||
drop(state);
|
||||
|
||||
let spec = if let Some(config_path) = &compute.params.config_path_test_only {
|
||||
// This path is only to make testing easier. In production we always get the spec from the HCC.
|
||||
info!(
|
||||
"reloading config.json from path: {}",
|
||||
config_path.to_string_lossy()
|
||||
);
|
||||
let path = Path::new(config_path);
|
||||
if let Ok(file) = File::open(path) {
|
||||
match serde_json::from_reader::<File, ComputeConfig>(file) {
|
||||
Ok(config) => config.spec,
|
||||
Err(e) => {
|
||||
error!("could not parse spec file: {}", e);
|
||||
None
|
||||
}
|
||||
}
|
||||
} else {
|
||||
error!(
|
||||
"could not open config file at path: {}",
|
||||
let get_config_result: anyhow::Result<ComputeConfig> =
|
||||
if let Some(config_path) = &compute.params.config_path_test_only {
|
||||
// This path is only to make testing easier. In production we always get the config from the HCC.
|
||||
info!(
|
||||
"reloading config.json from path: {}",
|
||||
config_path.to_string_lossy()
|
||||
);
|
||||
None
|
||||
}
|
||||
} else if let Some(control_plane_uri) = &compute.params.control_plane_uri {
|
||||
match get_config_from_control_plane(control_plane_uri, &compute.params.compute_id) {
|
||||
Ok(config) => config.spec,
|
||||
Err(e) => {
|
||||
error!("could not get config from control plane: {}", e);
|
||||
None
|
||||
let path = Path::new(config_path);
|
||||
if let Ok(file) = File::open(path) {
|
||||
match serde_json::from_reader::<File, ComputeConfig>(file) {
|
||||
Ok(config) => Ok(config),
|
||||
Err(e) => {
|
||||
error!("could not parse config file: {}", e);
|
||||
Err(anyhow::anyhow!("could not parse config file: {}", e))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
error!(
|
||||
"could not open config file at path: {:?}",
|
||||
config_path.to_string_lossy()
|
||||
);
|
||||
Err(anyhow::anyhow!(
|
||||
"could not open config file at path: {}",
|
||||
config_path.to_string_lossy()
|
||||
))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
} else if let Some(control_plane_uri) = &compute.params.control_plane_uri {
|
||||
get_config_from_control_plane(control_plane_uri, &compute.params.compute_id)
|
||||
} else {
|
||||
Err(anyhow::anyhow!("config_path_test_only is not set"))
|
||||
};
|
||||
|
||||
if let Some(spec) = spec {
|
||||
if let Ok(pspec) = ParsedSpec::try_from(spec) {
|
||||
// Parse any received ComputeSpec and transpose the result into a Result<Option<ParsedSpec>>.
|
||||
let parsed_spec_result: Result<Option<ParsedSpec>> =
|
||||
get_config_result.and_then(|config| {
|
||||
if let Some(spec) = config.spec {
|
||||
if let Ok(pspec) = ParsedSpec::try_from(spec) {
|
||||
Ok(Some(pspec))
|
||||
} else {
|
||||
Err(anyhow::anyhow!("could not parse spec"))
|
||||
}
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
});
|
||||
|
||||
let new_status: ComputeStatus;
|
||||
match parsed_spec_result {
|
||||
// Control plane (HCM) returned a spec and we were able to parse it.
|
||||
Ok(Some(pspec)) => {
|
||||
{
|
||||
let mut state = compute.state.lock().unwrap();
|
||||
// Defensive programming to make sure this thread is indeed the only one that can move the compute
|
||||
// node out of the `RefreshConfigurationPending` state. Would be nice if we can encode this invariant
|
||||
// node out of the `RefreshConfiguration` state. Would be nice if we can encode this invariant
|
||||
// into the type system.
|
||||
assert_eq!(state.status, ComputeStatus::RefreshConfigurationPending);
|
||||
assert_eq!(state.status, ComputeStatus::RefreshConfiguration);
|
||||
|
||||
if state.pspec.as_ref().map(|ps| ps.pageserver_connstr.clone())
|
||||
== Some(pspec.pageserver_connstr.clone())
|
||||
@@ -123,20 +143,45 @@ fn configurator_main_loop(compute: &Arc<ComputeNode>) {
|
||||
match compute.reconfigure() {
|
||||
Ok(_) => {
|
||||
info!("Refresh configuration: compute node configured");
|
||||
compute.set_status(ComputeStatus::Running);
|
||||
new_status = ComputeStatus::Running;
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
"Refresh configuration: could not configure compute node: {}",
|
||||
e
|
||||
);
|
||||
// Leave the compute node in the `RefreshConfigurationPending` state if the configuration
|
||||
// Set the compute node back to the `RefreshConfigurationPending` state if the configuration
|
||||
// was not successful. It should be okay to treat this situation the same as if the loop
|
||||
// hasn't executed yet as long as the detection side keeps notifying.
|
||||
new_status = ComputeStatus::RefreshConfigurationPending;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Control plane (HCM)'s response does not contain a spec. This is the "Empty" attachment case.
|
||||
Ok(None) => {
|
||||
info!(
|
||||
"Compute Manager signaled that this compute is no longer attached to any storage. Exiting."
|
||||
);
|
||||
// We just immediately terminate the whole compute_ctl in this case. It's not necessary to attempt a
|
||||
// clean shutdown as Postgres is probably not responding anyway (which is why we are in this refresh
|
||||
// configuration state).
|
||||
std::process::exit(1);
|
||||
}
|
||||
// Various error cases:
|
||||
// - The request to the control plane (HCM) either failed or returned a malformed spec.
|
||||
// - compute_ctl itself is configured incorrectly (e.g., compute_id is not set).
|
||||
Err(e) => {
|
||||
error!(
|
||||
"Refresh configuration: error getting a parsed spec: {:?}",
|
||||
e
|
||||
);
|
||||
new_status = ComputeStatus::RefreshConfigurationPending;
|
||||
// We may be dealing with an overloaded HCM if we end up in this path. Backoff 5 seconds before
|
||||
// retrying to avoid hammering the HCM.
|
||||
std::thread::sleep(std::time::Duration::from_secs(5));
|
||||
}
|
||||
}
|
||||
compute.set_status(new_status);
|
||||
} else if state.status == ComputeStatus::Failed {
|
||||
info!("compute node is now in Failed state, exiting");
|
||||
break;
|
||||
|
||||
@@ -938,7 +938,8 @@ impl Endpoint {
|
||||
| ComputeStatus::TerminationPendingFast
|
||||
| ComputeStatus::TerminationPendingImmediate
|
||||
| ComputeStatus::Terminated
|
||||
| ComputeStatus::RefreshConfigurationPending => {
|
||||
| ComputeStatus::RefreshConfigurationPending
|
||||
| ComputeStatus::RefreshConfiguration => {
|
||||
bail!("unexpected compute status: {:?}", state.status)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -174,6 +174,9 @@ pub enum ComputeStatus {
|
||||
Terminated,
|
||||
// A spec refresh is being requested
|
||||
RefreshConfigurationPending,
|
||||
// A spec refresh is being applied. We cannot refresh configuration again until the current
|
||||
// refresh is done, i.e., signal_refresh_configuration() will return 500 error.
|
||||
RefreshConfiguration,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize)]
|
||||
@@ -186,6 +189,10 @@ impl Display for ComputeStatus {
|
||||
match self {
|
||||
ComputeStatus::Empty => f.write_str("empty"),
|
||||
ComputeStatus::ConfigurationPending => f.write_str("configuration-pending"),
|
||||
ComputeStatus::RefreshConfiguration => f.write_str("refresh-configuration"),
|
||||
ComputeStatus::RefreshConfigurationPending => {
|
||||
f.write_str("refresh-configuration-pending")
|
||||
}
|
||||
ComputeStatus::Init => f.write_str("init"),
|
||||
ComputeStatus::Running => f.write_str("running"),
|
||||
ComputeStatus::Configuration => f.write_str("configuration"),
|
||||
@@ -195,9 +202,6 @@ impl Display for ComputeStatus {
|
||||
f.write_str("termination-pending-immediate")
|
||||
}
|
||||
ComputeStatus::Terminated => f.write_str("terminated"),
|
||||
ComputeStatus::RefreshConfigurationPending => {
|
||||
f.write_str("refresh-configuration-pending")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -89,6 +89,8 @@ static int pageserver_response_log_timeout = 10000;
|
||||
static int pageserver_response_disconnect_timeout = 150000;
|
||||
|
||||
static int conf_refresh_reconnect_attempt_threshold = 16;
|
||||
// Hadron: timeout for refresh errors (1 minute)
|
||||
static uint64 kRefreshErrorTimeoutUSec = 1 * USECS_PER_MINUTE;
|
||||
|
||||
typedef struct
|
||||
{
|
||||
@@ -1046,14 +1048,22 @@ pageserver_disconnect_shard(shardno_t shard_no)
|
||||
|
||||
extern int hadron_extension_server_port;
|
||||
|
||||
static void
|
||||
// The timestamp (usec) of the first error that occurred while trying to refresh the configuration.
|
||||
// Will be reset to 0 after a successful refresh.
|
||||
static uint64 first_recorded_refresh_error_usec = 0;
|
||||
|
||||
// Request compute_ctl to refresh the configuration. This operation may fail, e.g., if the compute_ctl
|
||||
// is already in the configuration state. The function returns true if the caller needs to cancel the
|
||||
// current query to avoid dead/live lock.
|
||||
static bool
|
||||
hadron_request_configuration_refresh() {
|
||||
static CURL *handle = NULL;
|
||||
CURLcode res;
|
||||
char *compute_ctl_url;
|
||||
bool cancel_query = false;
|
||||
|
||||
if (!lakebase_mode)
|
||||
return;
|
||||
return false;
|
||||
|
||||
if (handle == NULL)
|
||||
{
|
||||
@@ -1073,9 +1083,40 @@ hadron_request_configuration_refresh() {
|
||||
curl_easy_setopt(handle, CURLOPT_URL, compute_ctl_url);
|
||||
|
||||
res = curl_easy_perform(handle);
|
||||
if (res != CURLE_OK)
|
||||
if (res != CURLE_OK )
|
||||
{
|
||||
elog(WARNING, "compute_ctl refresh_configuration request failed: %s\n", curl_easy_strerror(res));
|
||||
elog(WARNING, "refresh_configuration request failed: %s\n", curl_easy_strerror(res));
|
||||
}
|
||||
else
|
||||
{
|
||||
long http_code = 0;
|
||||
curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &http_code);
|
||||
if ( res != CURLE_OK )
|
||||
{
|
||||
elog(WARNING, "compute_ctl refresh_configuration request getinfo failed: %s\n", curl_easy_strerror(res));
|
||||
}
|
||||
else
|
||||
{
|
||||
elog(LOG, "compute_ctl refresh_configuration got HTTP response: %ld\n", http_code);
|
||||
if( http_code == 200 )
|
||||
{
|
||||
first_recorded_refresh_error_usec = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (first_recorded_refresh_error_usec == 0)
|
||||
{
|
||||
first_recorded_refresh_error_usec = GetCurrentTimestamp();
|
||||
}
|
||||
else if(GetCurrentTimestamp() - first_recorded_refresh_error_usec > kRefreshErrorTimeoutUSec)
|
||||
{
|
||||
{
|
||||
first_recorded_refresh_error_usec = 0;
|
||||
cancel_query = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// In regular Postgres usage, it is not necessary to manually free memory allocated by palloc (psprintf) because
|
||||
@@ -1086,6 +1127,7 @@ hadron_request_configuration_refresh() {
|
||||
{
|
||||
pfree(compute_ctl_url);
|
||||
}
|
||||
return cancel_query;
|
||||
}
|
||||
// END HADRON
|
||||
|
||||
@@ -1123,8 +1165,10 @@ pageserver_send(shardno_t shard_no, NeonRequest *request)
|
||||
while (!pageserver_connect(shard_no, shard->n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR))
|
||||
{
|
||||
shard->n_reconnect_attempts += 1;
|
||||
if (shard->n_reconnect_attempts > conf_refresh_reconnect_attempt_threshold) {
|
||||
hadron_request_configuration_refresh();
|
||||
if (shard->n_reconnect_attempts > conf_refresh_reconnect_attempt_threshold
|
||||
&& hadron_request_configuration_refresh() )
|
||||
{
|
||||
neon_shard_log(shard_no, ERROR, "request failed too many times, cancelling query");
|
||||
}
|
||||
}
|
||||
shard->n_reconnect_attempts = 0;
|
||||
@@ -1338,6 +1382,16 @@ pageserver_try_receive(shardno_t shard_no)
|
||||
neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect: unexpected PQgetCopyData return value: %d", rc);
|
||||
}
|
||||
|
||||
/*
|
||||
* Always poke compute_ctl to request a configuration refresh if we have issues receiving data from pageservers after
|
||||
* successfully connecting to it. It could be an indication that we are connecting to the wrong pageservers (e.g. PS
|
||||
* is in secondary mode or otherwise refuses to respond our request).
|
||||
*/
|
||||
if ( rc < 0 && hadron_request_configuration_refresh() )
|
||||
{
|
||||
neon_shard_log(shard_no, ERROR, "refresh_configuration request failed, cancelling query");
|
||||
}
|
||||
|
||||
shard->nresponses_received++;
|
||||
return (NeonResponse *) resp;
|
||||
}
|
||||
|
||||
369
test_runner/regress/test_compute_termination.py
Normal file
369
test_runner/regress/test_compute_termination.py
Normal file
@@ -0,0 +1,369 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import threading
|
||||
import time
|
||||
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import requests
|
||||
from fixtures.log_helper import log
|
||||
from typing_extensions import override
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import Any
|
||||
|
||||
from fixtures.common_types import TenantId, TimelineId
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
from fixtures.port_distributor import PortDistributor
|
||||
|
||||
|
||||
def launch_compute_ctl(
|
||||
env: NeonEnv,
|
||||
endpoint_name: str,
|
||||
external_http_port: int,
|
||||
internal_http_port: int,
|
||||
pg_port: int,
|
||||
control_plane_port: int,
|
||||
) -> subprocess.Popen[str]:
|
||||
"""
|
||||
Helper function to launch compute_ctl process with common configuration.
|
||||
Returns the Popen process object.
|
||||
"""
|
||||
# Create endpoint directory structure following the standard pattern
|
||||
endpoint_path = env.repo_dir / "endpoints" / endpoint_name
|
||||
|
||||
# Clean up any existing endpoint directory to avoid conflicts
|
||||
if endpoint_path.exists():
|
||||
shutil.rmtree(endpoint_path)
|
||||
|
||||
endpoint_path.mkdir(mode=0o755, parents=True, exist_ok=True)
|
||||
|
||||
# pgdata path - compute_ctl will create this directory during basebackup
|
||||
pgdata_path = endpoint_path / "pgdata"
|
||||
|
||||
# Create log file in endpoint directory
|
||||
log_file = endpoint_path / "compute.log"
|
||||
log_handle = open(log_file, "w")
|
||||
|
||||
# Start compute_ctl pointing to our control plane
|
||||
compute_ctl_path = env.neon_binpath / "compute_ctl"
|
||||
connstr = f"postgresql://cloud_admin@localhost:{pg_port}/postgres"
|
||||
|
||||
# Find postgres binary path
|
||||
pg_bin_path = env.pg_distrib_dir / env.pg_version.v_prefixed / "bin" / "postgres"
|
||||
pg_lib_path = env.pg_distrib_dir / env.pg_version.v_prefixed / "lib"
|
||||
|
||||
env_vars = {
|
||||
"INSTANCE_ID": "lakebase-instance-id",
|
||||
"LD_LIBRARY_PATH": str(pg_lib_path), # Linux, etc.
|
||||
"DYLD_LIBRARY_PATH": str(pg_lib_path), # macOS
|
||||
}
|
||||
|
||||
cmd = [
|
||||
str(compute_ctl_path),
|
||||
"--external-http-port",
|
||||
str(external_http_port),
|
||||
"--internal-http-port",
|
||||
str(internal_http_port),
|
||||
"--pgdata",
|
||||
str(pgdata_path),
|
||||
"--connstr",
|
||||
connstr,
|
||||
"--pgbin",
|
||||
str(pg_bin_path),
|
||||
"--compute-id",
|
||||
endpoint_name, # Use endpoint_name as compute-id
|
||||
"--control-plane-uri",
|
||||
f"http://127.0.0.1:{control_plane_port}",
|
||||
"--lakebase-mode",
|
||||
"true",
|
||||
]
|
||||
|
||||
print(f"Launching compute_ctl with command: {cmd}")
|
||||
|
||||
# Start compute_ctl
|
||||
process = subprocess.Popen(
|
||||
cmd,
|
||||
env=env_vars,
|
||||
stdout=log_handle,
|
||||
stderr=subprocess.STDOUT, # Combine stderr with stdout
|
||||
text=True,
|
||||
)
|
||||
|
||||
return process
|
||||
|
||||
|
||||
def wait_for_compute_status(
|
||||
compute_process: subprocess.Popen[str],
|
||||
http_port: int,
|
||||
expected_status: str,
|
||||
timeout_seconds: int = 10,
|
||||
) -> None:
|
||||
"""
|
||||
Wait for compute_ctl to reach the expected status.
|
||||
Raises an exception if timeout is reached or process exits unexpectedly.
|
||||
"""
|
||||
start_time = time.time()
|
||||
while time.time() - start_time < timeout_seconds:
|
||||
try:
|
||||
# Try to connect to the HTTP endpoint
|
||||
response = requests.get(f"http://localhost:{http_port}/status", timeout=0.5)
|
||||
if response.status_code == 200:
|
||||
status_json = response.json()
|
||||
# Check if it's in expected status
|
||||
if status_json.get("status") == expected_status:
|
||||
return
|
||||
except (requests.ConnectionError, requests.Timeout):
|
||||
pass
|
||||
|
||||
# Check if process has exited
|
||||
if compute_process.poll() is not None:
|
||||
raise Exception(
|
||||
f"compute_ctl exited unexpectedly with code {compute_process.returncode}."
|
||||
)
|
||||
|
||||
time.sleep(0.5)
|
||||
|
||||
# Timeout reached
|
||||
compute_process.terminate()
|
||||
raise Exception(
|
||||
f"compute_ctl failed to reach {expected_status} status within {timeout_seconds} seconds."
|
||||
)
|
||||
|
||||
|
||||
class EmptySpecHandler(BaseHTTPRequestHandler):
|
||||
"""HTTP handler that returns an Empty compute spec response"""
|
||||
|
||||
def do_GET(self):
|
||||
if self.path.startswith("/compute/api/v2/computes/") and self.path.endswith("/spec"):
|
||||
# Return empty status which will put compute in Empty state
|
||||
response: dict[str, Any] = {
|
||||
"status": "empty",
|
||||
"spec": None,
|
||||
"compute_ctl_config": {"jwks": {"keys": []}},
|
||||
}
|
||||
self.send_response(200)
|
||||
self.send_header("Content-Type", "application/json")
|
||||
self.end_headers()
|
||||
self.wfile.write(json.dumps(response).encode())
|
||||
else:
|
||||
self.send_error(404)
|
||||
|
||||
@override
|
||||
def log_message(self, format: str, *args: Any):
|
||||
# Suppress request logging
|
||||
pass
|
||||
|
||||
|
||||
def test_compute_terminate_empty(neon_simple_env: NeonEnv, port_distributor: PortDistributor):
|
||||
"""
|
||||
Test that terminating a compute in Empty status works correctly.
|
||||
|
||||
This tests the bug fix where terminating an Empty compute would hang
|
||||
waiting for a non-existent postgres process to terminate.
|
||||
"""
|
||||
env = neon_simple_env
|
||||
|
||||
# Get ports for our test
|
||||
control_plane_port = port_distributor.get_port()
|
||||
external_http_port = port_distributor.get_port()
|
||||
internal_http_port = port_distributor.get_port()
|
||||
pg_port = port_distributor.get_port()
|
||||
|
||||
# Start a simple HTTP server that will serve the Empty spec
|
||||
server = HTTPServer(("127.0.0.1", control_plane_port), EmptySpecHandler)
|
||||
server_thread = threading.Thread(target=server.serve_forever)
|
||||
server_thread.daemon = True
|
||||
server_thread.start()
|
||||
|
||||
compute_process = None
|
||||
try:
|
||||
# Start compute_ctl with ephemeral tenant ID
|
||||
compute_process = launch_compute_ctl(
|
||||
env,
|
||||
"test-empty-compute",
|
||||
external_http_port,
|
||||
internal_http_port,
|
||||
pg_port,
|
||||
control_plane_port,
|
||||
)
|
||||
|
||||
# Wait for compute_ctl to start and report "empty" status
|
||||
wait_for_compute_status(compute_process, external_http_port, "empty")
|
||||
|
||||
# Now send terminate request
|
||||
response = requests.post(f"http://localhost:{external_http_port}/terminate")
|
||||
|
||||
# Verify that the termination request sends back a 200 OK response and is not abruptly terminated.
|
||||
assert response.status_code == 200, (
|
||||
f"Expected 200 OK, got {response.status_code}: {response.text}"
|
||||
)
|
||||
|
||||
# Wait for compute_ctl to exit
|
||||
exit_code = compute_process.wait(timeout=10)
|
||||
assert exit_code == 0, f"compute_ctl exited with non-zero code: {exit_code}"
|
||||
|
||||
finally:
|
||||
# Clean up
|
||||
server.shutdown()
|
||||
if compute_process and compute_process.poll() is None:
|
||||
compute_process.terminate()
|
||||
compute_process.wait()
|
||||
|
||||
|
||||
class SwitchableConfigHandler(BaseHTTPRequestHandler):
|
||||
"""HTTP handler that can switch between normal compute configs and compute configs without specs"""
|
||||
|
||||
return_empty_spec: bool = False
|
||||
tenant_id: TenantId | None = None
|
||||
timeline_id: TimelineId | None = None
|
||||
pageserver_port: int | None = None
|
||||
safekeeper_connstrs: list[str] | None = None
|
||||
|
||||
def do_GET(self):
|
||||
if self.path.startswith("/compute/api/v2/computes/") and self.path.endswith("/spec"):
|
||||
if self.return_empty_spec:
|
||||
# Return empty status
|
||||
response: dict[str, object | None] = {
|
||||
"status": "empty",
|
||||
"spec": None,
|
||||
"compute_ctl_config": {
|
||||
"jwks": {"keys": []},
|
||||
},
|
||||
}
|
||||
else:
|
||||
# Return normal attached spec
|
||||
response = {
|
||||
"status": "attached",
|
||||
"spec": {
|
||||
"format_version": 1.0,
|
||||
"cluster": {
|
||||
"roles": [],
|
||||
"databases": [],
|
||||
"postgresql_conf": "shared_preload_libraries='neon'",
|
||||
},
|
||||
"tenant_id": str(self.tenant_id) if self.tenant_id else "",
|
||||
"timeline_id": str(self.timeline_id) if self.timeline_id else "",
|
||||
"pageserver_connstring": f"postgres://no_user@localhost:{self.pageserver_port}"
|
||||
if self.pageserver_port
|
||||
else "",
|
||||
"safekeeper_connstrings": self.safekeeper_connstrs or [],
|
||||
"mode": "Primary",
|
||||
"skip_pg_catalog_updates": True,
|
||||
"reconfigure_concurrency": 1,
|
||||
"suspend_timeout_seconds": -1,
|
||||
},
|
||||
"compute_ctl_config": {
|
||||
"jwks": {"keys": []},
|
||||
},
|
||||
}
|
||||
self.send_response(200)
|
||||
self.send_header("Content-Type", "application/json")
|
||||
self.end_headers()
|
||||
self.wfile.write(json.dumps(response).encode())
|
||||
else:
|
||||
self.send_error(404)
|
||||
|
||||
@override
|
||||
def log_message(self, format: str, *args: Any):
|
||||
# Suppress request logging
|
||||
pass
|
||||
|
||||
|
||||
def test_compute_empty_spec_during_refresh_configuration(
|
||||
neon_simple_env: NeonEnv, port_distributor: PortDistributor
|
||||
):
|
||||
"""
|
||||
Test that compute exits when it receives an empty spec during refresh configuration state.
|
||||
|
||||
This test:
|
||||
1. Start compute with a normal spec
|
||||
2. Change the spec handler to return empty spec
|
||||
3. Trigger some condition to force compute to refresh configuration
|
||||
4. Verify that compute_ctl exits
|
||||
"""
|
||||
env = neon_simple_env
|
||||
|
||||
# Get ports for our test
|
||||
control_plane_port = port_distributor.get_port()
|
||||
external_http_port = port_distributor.get_port()
|
||||
internal_http_port = port_distributor.get_port()
|
||||
pg_port = port_distributor.get_port()
|
||||
|
||||
# Set up handler class variables
|
||||
SwitchableConfigHandler.tenant_id = env.initial_tenant
|
||||
SwitchableConfigHandler.timeline_id = env.initial_timeline
|
||||
SwitchableConfigHandler.pageserver_port = env.pageserver.service_port.pg
|
||||
# Convert comma-separated string to list
|
||||
safekeeper_connstrs = env.get_safekeeper_connstrs()
|
||||
if safekeeper_connstrs:
|
||||
SwitchableConfigHandler.safekeeper_connstrs = safekeeper_connstrs.split(",")
|
||||
else:
|
||||
SwitchableConfigHandler.safekeeper_connstrs = []
|
||||
SwitchableConfigHandler.return_empty_spec = False # Start with normal spec
|
||||
|
||||
# Start HTTP server with switchable spec handler
|
||||
server = HTTPServer(("127.0.0.1", control_plane_port), SwitchableConfigHandler)
|
||||
server_thread = threading.Thread(target=server.serve_forever)
|
||||
server_thread.daemon = True
|
||||
server_thread.start()
|
||||
|
||||
compute_process = None
|
||||
try:
|
||||
# Start compute_ctl with tenant and timeline IDs
|
||||
# Use a unique endpoint name to avoid conflicts
|
||||
endpoint_name = f"test-refresh-compute-{os.getpid()}"
|
||||
compute_process = launch_compute_ctl(
|
||||
env,
|
||||
endpoint_name,
|
||||
external_http_port,
|
||||
internal_http_port,
|
||||
pg_port,
|
||||
control_plane_port,
|
||||
)
|
||||
|
||||
# Wait for compute_ctl to start and report "running" status
|
||||
wait_for_compute_status(compute_process, external_http_port, "running", timeout_seconds=30)
|
||||
|
||||
log.info("Compute is running. Now returning empty spec and trigger configuration refresh.")
|
||||
|
||||
# Switch spec fetch handler to return empty spec
|
||||
SwitchableConfigHandler.return_empty_spec = True
|
||||
|
||||
# Trigger a configuration refresh
|
||||
try:
|
||||
requests.post(f"http://localhost:{internal_http_port}/refresh_configuration")
|
||||
except requests.RequestException as e:
|
||||
log.info(f"Call to /refresh_configuration failed: {e}")
|
||||
log.info(
|
||||
"Ignoring the error, assuming that compute_ctl is already refreshing or has exited"
|
||||
)
|
||||
|
||||
# Wait for compute_ctl to exit (it should exit when it gets an empty spec during refresh)
|
||||
exit_start_time = time.time()
|
||||
while time.time() - exit_start_time < 30:
|
||||
if compute_process.poll() is not None:
|
||||
# Process exited
|
||||
break
|
||||
time.sleep(0.5)
|
||||
|
||||
# Verify that compute_ctl exited
|
||||
exit_code = compute_process.poll()
|
||||
if exit_code is None:
|
||||
compute_process.terminate()
|
||||
raise Exception("compute_ctl did not exit after receiving empty spec.")
|
||||
|
||||
# The exit code might not be 0 in this case since it's an unexpected termination
|
||||
# but we mainly care that it did exit
|
||||
assert exit_code is not None, "compute_ctl should have exited"
|
||||
|
||||
finally:
|
||||
# Clean up
|
||||
server.shutdown()
|
||||
if compute_process and compute_process.poll() is None:
|
||||
compute_process.terminate()
|
||||
compute_process.wait()
|
||||
@@ -5,6 +5,7 @@ from fixtures.common_types import TenantShardId
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.metrics import parse_metrics
|
||||
from fixtures.neon_fixtures import Endpoint, NeonEnvBuilder, NeonPageserver
|
||||
from requests.exceptions import ConnectionError
|
||||
|
||||
|
||||
# Helper function to attempt reconfiguration of the compute to point to a new pageserver. Note that in these tests,
|
||||
@@ -75,8 +76,14 @@ def test_misrouted_to_secondary(
|
||||
assert read_misrouted_metric_value(secondary_ps) == 0
|
||||
assert read_request_error_metric_value(endpoint) == 0
|
||||
_attempt_reconfiguration(endpoint, new_pageserver_id=secondary_ps.id, timeout_sec=2.0)
|
||||
assert read_misrouted_metric_value(secondary_ps) > 0, "PS metric not incremented"
|
||||
assert read_request_error_metric_value(endpoint) > 0, "compute_ctl metric not incremented"
|
||||
assert read_misrouted_metric_value(secondary_ps) > 0
|
||||
try:
|
||||
assert read_request_error_metric_value(endpoint) > 0
|
||||
except ConnectionError:
|
||||
# When configuring PG to use misconfigured pageserver, PG will cancel the query after certain number of failed
|
||||
# reconfigure attempts. This will cause compute_ctl to exit.
|
||||
log.info("Cannot connect to PG, ignoring")
|
||||
pass
|
||||
|
||||
|
||||
def test_misrouted_to_ps_not_hosting_tenant(
|
||||
@@ -120,5 +127,11 @@ def test_misrouted_to_ps_not_hosting_tenant(
|
||||
assert read_misrouted_metric_value(non_hosting_ps) == 0
|
||||
assert read_request_error_metric_value(endpoint) == 0
|
||||
_attempt_reconfiguration(endpoint, new_pageserver_id=non_hosting_ps.id, timeout_sec=2.0)
|
||||
assert read_misrouted_metric_value(non_hosting_ps) > 0, "PS metric not incremented"
|
||||
assert read_request_error_metric_value(endpoint) > 0, "compute_ctl metric not incremented"
|
||||
assert read_misrouted_metric_value(non_hosting_ps) > 0
|
||||
try:
|
||||
assert read_request_error_metric_value(endpoint) > 0
|
||||
except ConnectionError:
|
||||
# When configuring PG to use misconfigured pageserver, PG will cancel the query after certain number of failed
|
||||
# reconfigure attempts. This will cause compute_ctl to exit.
|
||||
log.info("Cannot connect to PG, ignoring")
|
||||
pass
|
||||
|
||||
Reference in New Issue
Block a user