Compare commits

...

9 Commits

Author SHA1 Message Date
Christian Schwarz
51f1671c6b WIP: convert compute_ctl to use background_process 2024-01-25 19:55:01 +00:00
Christian Schwarz
cf916b9be8 unnest the match, as per review request 2024-01-25 18:28:05 +00:00
Christian Schwarz
f79e4a3a0b if it's not a connectivity issue, bail 2024-01-25 16:40:11 +00:00
Christian Schwarz
9fae0d9562 using /v1/tenant endpoint is actually not necessary, we don't care about status 503 2024-01-25 15:48:54 +00:00
Christian Schwarz
50c4b83066 impmlement request timeouts using cancel-by-drop 2024-01-25 15:36:44 +00:00
Christian Schwarz
727412b094 Revert "send request with a timeout, extending mgmt_api crate to do that"
This reverts commit 73e86160f3.
2024-01-25 15:31:33 +00:00
Christian Schwarz
73e86160f3 send request with a timeout, extending mgmt_api crate to do that 2024-01-25 15:31:10 +00:00
Christian Schwarz
1648639874 fix(neon_local): long init_tenant_mgr causes pageserver startup failure
Before this PR, if neon_local's `start_process()` ran out of retries
before pageserver started listening for requests, it would give up.
As of PR #6474 we at least kill the starting pageserver process in that
case, before that, we would leak it.

Pageserver `bind()s` the mgmt API early, but only starts `accept()`ing
HTTP requests after it has finished `init_tenant_mgr()` (plus some other
stuff).

init_tenant_mgr can take a long time with many tenants, i.e., longer
than the number of retries that neon_local permits.

Changes
=======

This PR changes the status check that neon_local performs when starting
pageserver to ignore connect & timeout errors, as those are expected
(see explanation above).

I verified that this allows for arbitrarily long `init_tenant_mgr()`
by adding a timeout at the top of that function.
2024-01-25 15:12:07 +00:00
Christian Schwarz
1dcb05c3d9 fix(neon_local): leaks child process if it fails to start & pass checks
Before this PR, if process_started() didn't return Ok(true) until we
ran out of retries, we'd return an error but leave the process running.

refs https://github.com/neondatabase/neon/issues/6473
2024-01-25 14:29:27 +00:00
9 changed files with 113 additions and 99 deletions

1
Cargo.lock generated
View File

@@ -1342,6 +1342,7 @@ dependencies = [
"regex",
"reqwest",
"safekeeper_api",
"scopeguard",
"serde",
"serde_json",
"serde_with",

View File

@@ -19,6 +19,7 @@ hex.workspace = true
hyper.workspace = true
regex.workspace = true
reqwest = { workspace = true, features = ["blocking", "json"] }
scopeguard.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true

View File

@@ -9,7 +9,7 @@ use pageserver_client::mgmt_api::ResponseErrorMessageExt;
use postgres_backend::AuthType;
use postgres_connection::parse_host_port;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{path::PathBuf, process::Child, str::FromStr};
use std::{path::PathBuf, str::FromStr};
use tracing::instrument;
use utils::{
auth::{Claims, Scope},
@@ -220,7 +220,7 @@ impl AttachmentService {
.expect("non-Unicode path")
}
pub async fn start(&self) -> anyhow::Result<Child> {
pub async fn start(&self) -> anyhow::Result<()> {
let path_str = self.path.to_string_lossy();
let mut args = vec!["-l", &self.listen, "-p", &path_str]
@@ -254,6 +254,7 @@ impl AttachmentService {
)
.await;
// TODO: shouldn't we bail if we fail to spawn the process?
for ps_conf in &self.env.pageservers {
let (pg_host, pg_port) =
parse_host_port(&ps_conf.listen_pg_addr).expect("Unable to parse listen_pg_addr");

View File

@@ -17,7 +17,7 @@ use std::io::Write;
use std::os::unix::prelude::AsRawFd;
use std::os::unix::process::CommandExt;
use std::path::Path;
use std::process::{Child, Command};
use std::process::Command;
use std::time::Duration;
use std::{fs, io, thread};
@@ -60,7 +60,7 @@ pub async fn start_process<F, Fut, AI, A, EI>(
envs: EI,
initial_pid_file: InitialPidFile,
process_status_check: F,
) -> anyhow::Result<Child>
) -> anyhow::Result<()>
where
F: Fn() -> Fut,
Fut: std::future::Future<Output = anyhow::Result<bool>>,
@@ -98,7 +98,7 @@ where
InitialPidFile::Expect(path) => path,
};
let mut spawned_process = filled_cmd.spawn().with_context(|| {
let spawned_process = filled_cmd.spawn().with_context(|| {
format!("Could not spawn {process_name}, see console output and log files for details.")
})?;
let pid = spawned_process.id();
@@ -106,12 +106,26 @@ where
i32::try_from(pid)
.with_context(|| format!("Subprocess {process_name} has invalid pid {pid}"))?,
);
// set up a scopeguard to kill & wait for the child in case we panic or bail below
let spawned_process = scopeguard::guard(spawned_process, |mut spawned_process| {
println!("SIGKILL & wait the started process");
(|| {
// TODO: use another signal that can be caught by the child so it can clean up any children it spawned (e..g, walredo).
spawned_process.kill().context("SIGKILL child")?;
spawned_process.wait().context("wait() for child process")?;
anyhow::Ok(())
})()
.with_context(|| format!("scopeguard kill&wait child {process_name:?}"))
.unwrap();
});
for retries in 0..RETRIES {
match process_started(pid, pid_file_to_check, &process_status_check).await {
Ok(true) => {
println!("\n{process_name} started, pid: {pid}");
return Ok(spawned_process);
println!("\n{process_name} started and passed status check, pid: {pid}");
// leak the child process, it'll outlive this neon_local invocation
drop(scopeguard::ScopeGuard::into_inner(spawned_process));
return Ok(());
}
Ok(false) => {
if retries == NOTICE_AFTER_RETRIES {
@@ -126,16 +140,15 @@ where
thread::sleep(Duration::from_millis(RETRY_INTERVAL_MILLIS));
}
Err(e) => {
println!("{process_name} failed to start: {e:#}");
if let Err(e) = spawned_process.kill() {
println!("Could not stop {process_name} subprocess: {e:#}")
};
println!("error starting process {process_name:?}: {e:#}");
return Err(e);
}
}
}
println!();
anyhow::bail!("{process_name} did not start in {RETRY_UNTIL_SECS} seconds");
anyhow::bail!(
"{process_name} did not start+pass status checks within {RETRY_UNTIL_SECS} seconds"
);
}
/// Stops the process, using the pid file given. Returns Ok also if the process is already not running.

View File

@@ -26,7 +26,7 @@
//!
//! ```text
//! .neon/endpoints/main/
//! compute.log - log output of `compute_ctl` and `postgres`
//! compute_ctl.log - log output of `compute_ctl` and `postgres`
//! endpoint.json - serialized `EndpointConf` struct
//! postgresql.conf - postgresql settings
//! spec.json - passed to `compute_ctl`
@@ -45,6 +45,7 @@ use std::sync::Arc;
use std::time::Duration;
use anyhow::{anyhow, bail, Context, Result};
use camino::Utf8PathBuf;
use compute_api::spec::RemoteExtSpec;
use nix::sys::signal::kill;
use nix::sys::signal::Signal;
@@ -53,6 +54,7 @@ use url::Host;
use utils::id::{NodeId, TenantId, TimelineId};
use crate::attachment_service::AttachmentService;
use crate::background_process;
use crate::local_env::LocalEnv;
use crate::postgresql_conf::PostgresConf;
@@ -550,89 +552,84 @@ impl Endpoint {
let spec_path = self.endpoint_path().join("spec.json");
std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
// Open log file. We'll redirect the stdout and stderr of `compute_ctl` to it.
let logfile = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(self.endpoint_path().join("compute.log"))?;
// Launch compute_ctl
println!("Starting postgres node at '{}'", self.connstr());
let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl"));
cmd.args(["--http-port", &self.http_address.port().to_string()])
.args(["--pgdata", self.pgdata().to_str().unwrap()])
.args(["--connstr", &self.connstr()])
.args([
"--spec-path",
self.endpoint_path().join("spec.json").to_str().unwrap(),
])
.args([
"--pgbin",
self.env
.pg_bin_dir(self.pg_version)?
.join("postgres")
.to_str()
.unwrap(),
])
.stdin(std::process::Stdio::null())
.stderr(logfile.try_clone()?)
.stdout(logfile);
let pidfile_path: Utf8PathBuf = self
.endpoint_path()
.join("compute_ctl.pid")
.try_into()
.unwrap();
let mut args = vec![
"--http-port".to_string(),
self.http_address.port().to_string(),
"--pgdata".to_string(),
self.pgdata().to_str().unwrap().to_string(),
"--connstr".to_string(),
self.connstr().to_string(),
"--spec-path".to_string(),
self.endpoint_path()
.join("spec.json")
.to_str()
.unwrap()
.to_string(),
"--pgbin".to_string(),
self.env
.pg_bin_dir(self.pg_version)?
.join("postgres")
.to_str()
.unwrap()
.to_string(),
];
if let Some(remote_ext_config) = remote_ext_config {
cmd.args(["--remote-ext-config", remote_ext_config]);
args.extend(["--remote-ext-config".to_string(), remote_ext_config.clone()]);
}
let child = cmd.spawn()?;
// Write down the pid so we can wait for it when we want to stop
// TODO use background_process::start_process instead
let pid = child.id();
let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
std::fs::write(pidfile_path, pid.to_string())?;
// Wait for it to start
let mut attempt = 0;
const ATTEMPT_INTERVAL: Duration = Duration::from_millis(100);
const MAX_ATTEMPTS: u32 = 10 * 30; // Wait up to 30 s
loop {
attempt += 1;
match self.get_status().await {
Ok(state) => {
match state.status {
ComputeStatus::Init => {
if attempt == MAX_ATTEMPTS {
bail!("compute startup timed out; still in Init state");
}
// keep retrying
}
ComputeStatus::Running => {
// All good!
break;
}
ComputeStatus::Failed => {
bail!(
"compute startup failed: {}",
state
.error
.as_deref()
.unwrap_or("<no error from compute_ctl>")
);
}
ComputeStatus::Empty
| ComputeStatus::ConfigurationPending
| ComputeStatus::Configuration => {
bail!("unexpected compute status: {:?}", state.status)
}
background_process::start_process(
"compute_ctl",
&self.endpoint_path(),
&self.env.neon_distrib_dir.join("compute_ctl"),
args,
[],
background_process::InitialPidFile::Create(pidfile_path.clone()),
|| async {
let st = tokio::time::timeout(Duration::from_secs(1), self.get_status()).await;
let Ok(st) = st else {
// timeout, it's not up yet
return Ok(false);
};
let Ok(state) = st else {
// unspecified error
return Ok(false);
};
match state.status {
ComputeStatus::Init => {
// keep retrying
return Ok(false);
}
ComputeStatus::Running => {
// All good!
return Ok(true);
}
ComputeStatus::Failed => {
bail!(
"compute startup failed: {}",
state
.error
.as_deref()
.unwrap_or("<no error from compute_ctl>")
);
}
ComputeStatus::Empty
| ComputeStatus::ConfigurationPending
| ComputeStatus::Configuration => {
bail!("unexpected compute status: {:?}", state.status)
}
}
Err(e) => {
if attempt == MAX_ATTEMPTS {
return Err(e).context("timed out waiting to connect to compute_ctl HTTP");
}
}
}
std::thread::sleep(ATTEMPT_INTERVAL);
}
},
)
.await?;
Ok(())
}

View File

@@ -11,7 +11,7 @@ use std::io;
use std::io::Write;
use std::num::NonZeroU64;
use std::path::PathBuf;
use std::process::{Child, Command};
use std::process::Command;
use std::time::Duration;
use anyhow::{bail, Context};
@@ -161,7 +161,7 @@ impl PageServerNode {
.expect("non-Unicode path")
}
pub async fn start(&self, config_overrides: &[&str]) -> anyhow::Result<Child> {
pub async fn start(&self, config_overrides: &[&str]) -> anyhow::Result<()> {
self.start_node(config_overrides, false).await
}
@@ -207,7 +207,7 @@ impl PageServerNode {
&self,
config_overrides: &[&str],
update_config: bool,
) -> anyhow::Result<Child> {
) -> anyhow::Result<()> {
// TODO: using a thread here because start_process() is not async but we need to call check_status()
let datadir = self.repo_path();
print!(
@@ -236,11 +236,13 @@ impl PageServerNode {
self.pageserver_env_variables()?,
background_process::InitialPidFile::Expect(self.pid_file()),
|| async {
let st = self.check_status().await;
match st {
Ok(()) => Ok(true),
Err(mgmt_api::Error::ReceiveBody(_)) => Ok(false),
Err(e) => Err(anyhow::anyhow!("Failed to check node status: {e}")),
let res =
tokio::time::timeout(Duration::from_secs(1), self.http_client.status()).await;
match res {
Ok(Ok(_)) => Ok(true),
Ok(Err(mgmt_api::Error::ReceiveBody(e))) if e.is_connect() => Ok(false),
Ok(Err(e)) => anyhow::bail!(e),
Err(_timeout) => Ok(false),
}
},
)

View File

@@ -7,7 +7,6 @@
//! ```
use std::io::Write;
use std::path::PathBuf;
use std::process::Child;
use std::{io, result};
use anyhow::Context;
@@ -104,7 +103,7 @@ impl SafekeeperNode {
.expect("non-Unicode path")
}
pub async fn start(&self, extra_opts: Vec<String>) -> anyhow::Result<Child> {
pub async fn start(&self, extra_opts: Vec<String>) -> anyhow::Result<()> {
print!(
"Starting safekeeper at '{}' in '{}'",
self.pg_connection_config.raw_address(),

View File

@@ -24,7 +24,7 @@ def wait_caughtup(primary: Endpoint, secondary: Endpoint):
# Check for corrupted WAL messages which might otherwise go unnoticed if
# reconnection fixes this.
def scan_standby_log_for_errors(secondary):
log_path = secondary.endpoint_path() / "compute.log"
log_path = secondary.endpoint_path() / "compute_ctl.log"
with log_path.open("r") as f:
markers = re.compile(
r"incorrect resource manager data|record with incorrect|invalid magic number|unexpected pageaddr"

View File

@@ -8,7 +8,7 @@ def test_migrations(neon_simple_env: NeonEnv):
env.neon_cli.create_branch("test_migrations", "empty")
endpoint = env.endpoints.create("test_migrations")
log_path = endpoint.endpoint_path() / "compute.log"
log_path = endpoint.endpoint_path() / "compute_ctl.log"
endpoint.respec(skip_pg_catalog_updates=False, features=["migrations"])
endpoint.start()