mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 01:42:55 +00:00
WIP: convert compute_ctl to use background_process
This commit is contained in:
@@ -26,7 +26,7 @@
|
||||
//!
|
||||
//! ```text
|
||||
//! .neon/endpoints/main/
|
||||
//! compute.log - log output of `compute_ctl` and `postgres`
|
||||
//! compute_ctl.log - log output of `compute_ctl` and `postgres`
|
||||
//! endpoint.json - serialized `EndpointConf` struct
|
||||
//! postgresql.conf - postgresql settings
|
||||
//! spec.json - passed to `compute_ctl`
|
||||
@@ -45,6 +45,7 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{anyhow, bail, Context, Result};
|
||||
use camino::Utf8PathBuf;
|
||||
use compute_api::spec::RemoteExtSpec;
|
||||
use nix::sys::signal::kill;
|
||||
use nix::sys::signal::Signal;
|
||||
@@ -53,6 +54,7 @@ use url::Host;
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
|
||||
use crate::attachment_service::AttachmentService;
|
||||
use crate::background_process;
|
||||
use crate::local_env::LocalEnv;
|
||||
use crate::postgresql_conf::PostgresConf;
|
||||
|
||||
@@ -550,89 +552,84 @@ impl Endpoint {
|
||||
let spec_path = self.endpoint_path().join("spec.json");
|
||||
std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
|
||||
|
||||
// Open log file. We'll redirect the stdout and stderr of `compute_ctl` to it.
|
||||
let logfile = std::fs::OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open(self.endpoint_path().join("compute.log"))?;
|
||||
|
||||
// Launch compute_ctl
|
||||
println!("Starting postgres node at '{}'", self.connstr());
|
||||
let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl"));
|
||||
cmd.args(["--http-port", &self.http_address.port().to_string()])
|
||||
.args(["--pgdata", self.pgdata().to_str().unwrap()])
|
||||
.args(["--connstr", &self.connstr()])
|
||||
.args([
|
||||
"--spec-path",
|
||||
self.endpoint_path().join("spec.json").to_str().unwrap(),
|
||||
])
|
||||
.args([
|
||||
"--pgbin",
|
||||
self.env
|
||||
.pg_bin_dir(self.pg_version)?
|
||||
.join("postgres")
|
||||
.to_str()
|
||||
.unwrap(),
|
||||
])
|
||||
.stdin(std::process::Stdio::null())
|
||||
.stderr(logfile.try_clone()?)
|
||||
.stdout(logfile);
|
||||
let pidfile_path: Utf8PathBuf = self
|
||||
.endpoint_path()
|
||||
.join("compute_ctl.pid")
|
||||
.try_into()
|
||||
.unwrap();
|
||||
|
||||
let mut args = vec![
|
||||
"--http-port".to_string(),
|
||||
self.http_address.port().to_string(),
|
||||
"--pgdata".to_string(),
|
||||
self.pgdata().to_str().unwrap().to_string(),
|
||||
"--connstr".to_string(),
|
||||
self.connstr().to_string(),
|
||||
"--spec-path".to_string(),
|
||||
self.endpoint_path()
|
||||
.join("spec.json")
|
||||
.to_str()
|
||||
.unwrap()
|
||||
.to_string(),
|
||||
"--pgbin".to_string(),
|
||||
self.env
|
||||
.pg_bin_dir(self.pg_version)?
|
||||
.join("postgres")
|
||||
.to_str()
|
||||
.unwrap()
|
||||
.to_string(),
|
||||
];
|
||||
if let Some(remote_ext_config) = remote_ext_config {
|
||||
cmd.args(["--remote-ext-config", remote_ext_config]);
|
||||
args.extend(["--remote-ext-config".to_string(), remote_ext_config.clone()]);
|
||||
}
|
||||
|
||||
let child = cmd.spawn()?;
|
||||
|
||||
// Write down the pid so we can wait for it when we want to stop
|
||||
// TODO use background_process::start_process instead
|
||||
let pid = child.id();
|
||||
let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
|
||||
std::fs::write(pidfile_path, pid.to_string())?;
|
||||
|
||||
// Wait for it to start
|
||||
let mut attempt = 0;
|
||||
const ATTEMPT_INTERVAL: Duration = Duration::from_millis(100);
|
||||
const MAX_ATTEMPTS: u32 = 10 * 30; // Wait up to 30 s
|
||||
loop {
|
||||
attempt += 1;
|
||||
match self.get_status().await {
|
||||
Ok(state) => {
|
||||
match state.status {
|
||||
ComputeStatus::Init => {
|
||||
if attempt == MAX_ATTEMPTS {
|
||||
bail!("compute startup timed out; still in Init state");
|
||||
}
|
||||
// keep retrying
|
||||
}
|
||||
ComputeStatus::Running => {
|
||||
// All good!
|
||||
break;
|
||||
}
|
||||
ComputeStatus::Failed => {
|
||||
bail!(
|
||||
"compute startup failed: {}",
|
||||
state
|
||||
.error
|
||||
.as_deref()
|
||||
.unwrap_or("<no error from compute_ctl>")
|
||||
);
|
||||
}
|
||||
ComputeStatus::Empty
|
||||
| ComputeStatus::ConfigurationPending
|
||||
| ComputeStatus::Configuration => {
|
||||
bail!("unexpected compute status: {:?}", state.status)
|
||||
}
|
||||
background_process::start_process(
|
||||
"compute_ctl",
|
||||
&self.endpoint_path(),
|
||||
&self.env.neon_distrib_dir.join("compute_ctl"),
|
||||
args,
|
||||
[],
|
||||
background_process::InitialPidFile::Create(pidfile_path.clone()),
|
||||
|| async {
|
||||
let st = tokio::time::timeout(Duration::from_secs(1), self.get_status()).await;
|
||||
let Ok(st) = st else {
|
||||
// timeout, it's not up yet
|
||||
return Ok(false);
|
||||
};
|
||||
let Ok(state) = st else {
|
||||
// unspecified error
|
||||
return Ok(false);
|
||||
};
|
||||
match state.status {
|
||||
ComputeStatus::Init => {
|
||||
// keep retrying
|
||||
return Ok(false);
|
||||
}
|
||||
ComputeStatus::Running => {
|
||||
// All good!
|
||||
return Ok(true);
|
||||
}
|
||||
ComputeStatus::Failed => {
|
||||
bail!(
|
||||
"compute startup failed: {}",
|
||||
state
|
||||
.error
|
||||
.as_deref()
|
||||
.unwrap_or("<no error from compute_ctl>")
|
||||
);
|
||||
}
|
||||
ComputeStatus::Empty
|
||||
| ComputeStatus::ConfigurationPending
|
||||
| ComputeStatus::Configuration => {
|
||||
bail!("unexpected compute status: {:?}", state.status)
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
if attempt == MAX_ATTEMPTS {
|
||||
return Err(e).context("timed out waiting to connect to compute_ctl HTTP");
|
||||
}
|
||||
}
|
||||
}
|
||||
std::thread::sleep(ATTEMPT_INTERVAL);
|
||||
}
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -24,7 +24,7 @@ def wait_caughtup(primary: Endpoint, secondary: Endpoint):
|
||||
# Check for corrupted WAL messages which might otherwise go unnoticed if
|
||||
# reconnection fixes this.
|
||||
def scan_standby_log_for_errors(secondary):
|
||||
log_path = secondary.endpoint_path() / "compute.log"
|
||||
log_path = secondary.endpoint_path() / "compute_ctl.log"
|
||||
with log_path.open("r") as f:
|
||||
markers = re.compile(
|
||||
r"incorrect resource manager data|record with incorrect|invalid magic number|unexpected pageaddr"
|
||||
|
||||
@@ -8,7 +8,7 @@ def test_migrations(neon_simple_env: NeonEnv):
|
||||
env.neon_cli.create_branch("test_migrations", "empty")
|
||||
|
||||
endpoint = env.endpoints.create("test_migrations")
|
||||
log_path = endpoint.endpoint_path() / "compute.log"
|
||||
log_path = endpoint.endpoint_path() / "compute_ctl.log"
|
||||
|
||||
endpoint.respec(skip_pg_catalog_updates=False, features=["migrations"])
|
||||
endpoint.start()
|
||||
|
||||
Reference in New Issue
Block a user