Remove daemonize from storage components (#2677)

Move daemonization logic into `control_plane`.
Storage binaries now only crate a lockfile to avoid concurrent services running in the same directory.
This commit is contained in:
Kirill Bulatov
2022-11-02 02:26:37 +02:00
committed by GitHub
parent 6df4d5c911
commit d42700280f
37 changed files with 754 additions and 744 deletions

19
Cargo.lock generated
View File

@@ -317,12 +317,6 @@ dependencies = [
"generic-array", "generic-array",
] ]
[[package]]
name = "boxfnonce"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5988cb1d626264ac94100be357308f29ff7cbdd3b36bda27f450a4ee3f713426"
[[package]] [[package]]
name = "bstr" name = "bstr"
version = "1.0.1" version = "1.0.1"
@@ -850,16 +844,6 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "daemonize"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70c24513e34f53b640819f0ac9f705b673fcf4006d7aab8778bee72ebfc89815"
dependencies = [
"boxfnonce",
"libc",
]
[[package]] [[package]]
name = "darling" name = "darling"
version = "0.14.1" version = "0.14.1"
@@ -2141,7 +2125,6 @@ dependencies = [
"crc32c", "crc32c",
"criterion", "criterion",
"crossbeam-utils", "crossbeam-utils",
"daemonize",
"etcd_broker", "etcd_broker",
"fail", "fail",
"futures", "futures",
@@ -3088,7 +3071,6 @@ dependencies = [
"clap 4.0.15", "clap 4.0.15",
"const_format", "const_format",
"crc32c", "crc32c",
"daemonize",
"etcd_broker", "etcd_broker",
"fs2", "fs2",
"git-version", "git-version",
@@ -3096,6 +3078,7 @@ dependencies = [
"humantime", "humantime",
"hyper", "hyper",
"metrics", "metrics",
"nix 0.25.0",
"once_cell", "once_cell",
"parking_lot 0.12.1", "parking_lot 0.12.1",
"postgres", "postgres",

View File

@@ -0,0 +1,264 @@
//! Spawns and kills background processes that are needed by Neon CLI.
//! Applies common set-up such as log and pid files (if needed) to every process.
//!
//! Neon CLI does not run in background, so it needs to store the information about
//! spawned processes, which it does in this module.
//! We do that by storing the pid of the process in the "${process_name}.pid" file.
//! The pid file can be created by the process itself
//! (Neon storage binaries do that and also ensure that a lock is taken onto that file)
//! or we create such file after starting the process
//! (non-Neon binaries don't necessarily follow our pidfile conventions).
//! The pid stored in the file is later used to stop the service.
//!
//! See [`lock_file`] module for more info.
use std::ffi::OsStr;
use std::io::Write;
use std::path::Path;
use std::process::{Child, Command};
use std::time::Duration;
use std::{fs, io, thread};
use anyhow::{anyhow, bail, Context, Result};
use nix::errno::Errno;
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use utils::lock_file;
const RETRIES: u32 = 15;
const RETRY_TIMEOUT_MILLIS: u64 = 500;
/// Argument to `start_process`, to indicate whether it should create pidfile or if the process creates
/// it itself.
pub enum InitialPidFile<'t> {
/// Create a pidfile, to allow future CLI invocations to manipulate the process.
Create(&'t Path),
/// The process will create the pidfile itself, need to wait for that event.
Expect(&'t Path),
}
/// Start a background child process using the parameters given.
pub fn start_process<F, S: AsRef<OsStr>>(
process_name: &str,
datadir: &Path,
command: &Path,
args: &[S],
initial_pid_file: InitialPidFile,
process_status_check: F,
) -> anyhow::Result<Child>
where
F: Fn() -> anyhow::Result<bool>,
{
let log_path = datadir.join(format!("{process_name}.log"));
let process_log_file = fs::OpenOptions::new()
.create(true)
.write(true)
.append(true)
.open(&log_path)
.with_context(|| {
format!("Could not open {process_name} log file {log_path:?} for writing")
})?;
let same_file_for_stderr = process_log_file.try_clone().with_context(|| {
format!("Could not reuse {process_name} log file {log_path:?} for writing stderr")
})?;
let mut command = Command::new(command);
let background_command = command
.stdout(process_log_file)
.stderr(same_file_for_stderr)
.args(args);
let filled_cmd = fill_aws_secrets_vars(fill_rust_env_vars(background_command));
let mut spawned_process = filled_cmd.spawn().with_context(|| {
format!("Could not spawn {process_name}, see console output and log files for details.")
})?;
let pid = spawned_process.id();
let pid = Pid::from_raw(
i32::try_from(pid)
.with_context(|| format!("Subprocess {process_name} has invalid pid {pid}"))?,
);
let pid_file_to_check = match initial_pid_file {
InitialPidFile::Create(target_pid_file_path) => {
match lock_file::create_lock_file(target_pid_file_path, pid.to_string()) {
lock_file::LockCreationResult::Created { .. } => {
// We use "lock" file here only to create the pid file. The lock on the pidfile will be dropped as soon
// as this CLI invocation exits, so it's a bit useless, but doesn't any harm either.
}
lock_file::LockCreationResult::AlreadyLocked { .. } => {
anyhow::bail!("Cannot write pid file for {process_name} at path {target_pid_file_path:?}: file is already locked by another process")
}
lock_file::LockCreationResult::CreationFailed(e) => {
return Err(e.context(format!(
"Failed to create pid file for {process_name} at path {target_pid_file_path:?}"
)))
}
}
None
}
InitialPidFile::Expect(pid_file_path) => Some(pid_file_path),
};
for retries in 0..RETRIES {
match process_started(pid, pid_file_to_check, &process_status_check) {
Ok(true) => {
println!("\n{process_name} started, pid: {pid}");
return Ok(spawned_process);
}
Ok(false) => {
if retries < 5 {
print!(".");
io::stdout().flush().unwrap();
} else {
if retries == 5 {
println!() // put a line break after dots for second message
}
println!("{process_name} has not started yet, retrying ({retries})...");
}
thread::sleep(Duration::from_millis(RETRY_TIMEOUT_MILLIS));
}
Err(e) => {
println!("{process_name} failed to start: {e:#}");
if let Err(e) = spawned_process.kill() {
println!("Could not stop {process_name} subprocess: {e:#}")
};
return Err(e);
}
}
}
anyhow::bail!("{process_name} could not start in {RETRIES} attempts");
}
/// Stops the process, using the pid file given. Returns Ok also if the process is already not running.
pub fn stop_process(immediate: bool, process_name: &str, pid_file: &Path) -> anyhow::Result<()> {
if !pid_file.exists() {
println!("{process_name} is already stopped: no pid file {pid_file:?} is present");
return Ok(());
}
let pid = read_pidfile(pid_file)?;
let sig = if immediate {
print!("Stopping {process_name} with pid {pid} immediately..");
Signal::SIGQUIT
} else {
print!("Stopping {process_name} with pid {pid} gracefully..");
Signal::SIGTERM
};
io::stdout().flush().unwrap();
match kill(pid, sig) {
Ok(()) => (),
Err(Errno::ESRCH) => {
println!(
"{process_name} with pid {pid} does not exist, but a pid file {pid_file:?} was found"
);
return Ok(());
}
Err(e) => anyhow::bail!("Failed to send signal to {process_name} with pid {pid}: {e}"),
}
// Wait until process is gone
for _ in 0..RETRIES {
match process_has_stopped(pid) {
Ok(true) => {
println!("\n{process_name} stopped");
if let Err(e) = fs::remove_file(pid_file) {
if e.kind() != io::ErrorKind::NotFound {
eprintln!("Failed to remove pid file {pid_file:?} after stopping the process: {e:#}");
}
}
return Ok(());
}
Ok(false) => {
print!(".");
io::stdout().flush().unwrap();
thread::sleep(Duration::from_secs(1))
}
Err(e) => {
println!("{process_name} with pid {pid} failed to stop: {e:#}");
return Err(e);
}
}
}
anyhow::bail!("{process_name} with pid {pid} failed to stop in {RETRIES} attempts");
}
fn fill_rust_env_vars(cmd: &mut Command) -> &mut Command {
let mut filled_cmd = cmd.env_clear().env("RUST_BACKTRACE", "1");
let var = "LLVM_PROFILE_FILE";
if let Some(val) = std::env::var_os(var) {
filled_cmd = filled_cmd.env(var, val);
}
const RUST_LOG_KEY: &str = "RUST_LOG";
if let Ok(rust_log_value) = std::env::var(RUST_LOG_KEY) {
filled_cmd.env(RUST_LOG_KEY, rust_log_value)
} else {
filled_cmd
}
}
fn fill_aws_secrets_vars(mut cmd: &mut Command) -> &mut Command {
for env_key in [
"AWS_ACCESS_KEY_ID",
"AWS_SECRET_ACCESS_KEY",
"AWS_SESSION_TOKEN",
] {
if let Ok(value) = std::env::var(env_key) {
cmd = cmd.env(env_key, value);
}
}
cmd
}
fn process_started<F>(
pid: Pid,
pid_file_to_check: Option<&Path>,
status_check: &F,
) -> anyhow::Result<bool>
where
F: Fn() -> anyhow::Result<bool>,
{
match status_check() {
Ok(true) => match pid_file_to_check {
Some(pid_file_path) => {
if pid_file_path.exists() {
let pid_in_file = read_pidfile(pid_file_path)?;
Ok(pid_in_file == pid)
} else {
Ok(false)
}
}
None => Ok(true),
},
Ok(false) => Ok(false),
Err(e) => anyhow::bail!("process failed to start: {e}"),
}
}
/// Read a PID file
///
/// We expect a file that contains a single integer.
fn read_pidfile(pidfile: &Path) -> Result<Pid> {
let pid_str = fs::read_to_string(pidfile)
.with_context(|| format!("failed to read pidfile {pidfile:?}"))?;
let pid: i32 = pid_str
.parse()
.map_err(|_| anyhow!("failed to parse pidfile {pidfile:?}"))?;
if pid < 1 {
bail!("pidfile {pidfile:?} contained bad value '{pid}'");
}
Ok(Pid::from_raw(pid))
}
fn process_has_stopped(pid: Pid) -> anyhow::Result<bool> {
match kill(pid, None) {
// Process exists, keep waiting
Ok(_) => Ok(false),
// Process not found, we're done
Err(Errno::ESRCH) => Ok(true),
Err(err) => anyhow::bail!("Failed to send signal to process with pid {pid}: {err}"),
}
}

View File

@@ -9,8 +9,8 @@ use anyhow::{anyhow, bail, Context, Result};
use clap::{value_parser, Arg, ArgAction, ArgMatches, Command}; use clap::{value_parser, Arg, ArgAction, ArgMatches, Command};
use control_plane::compute::ComputeControlPlane; use control_plane::compute::ComputeControlPlane;
use control_plane::local_env::{EtcdBroker, LocalEnv}; use control_plane::local_env::{EtcdBroker, LocalEnv};
use control_plane::pageserver::PageServerNode;
use control_plane::safekeeper::SafekeeperNode; use control_plane::safekeeper::SafekeeperNode;
use control_plane::storage::PageServerNode;
use control_plane::{etcd, local_env}; use control_plane::{etcd, local_env};
use pageserver_api::models::TimelineInfo; use pageserver_api::models::TimelineInfo;
use pageserver_api::{ use pageserver_api::{

View File

@@ -18,8 +18,8 @@ use utils::{
}; };
use crate::local_env::{LocalEnv, DEFAULT_PG_VERSION}; use crate::local_env::{LocalEnv, DEFAULT_PG_VERSION};
use crate::pageserver::PageServerNode;
use crate::postgresql_conf::PostgresConf; use crate::postgresql_conf::PostgresConf;
use crate::storage::PageServerNode;
// //
// ComputeControlPlane // ComputeControlPlane

View File

@@ -1,50 +1,22 @@
use std::{ use std::{fs, path::PathBuf};
fs,
path::PathBuf,
process::{Command, Stdio},
};
use anyhow::Context; use anyhow::Context;
use nix::{
sys::signal::{kill, Signal},
unistd::Pid,
};
use crate::{local_env, read_pidfile}; use crate::{background_process, local_env};
pub fn start_etcd_process(env: &local_env::LocalEnv) -> anyhow::Result<()> { pub fn start_etcd_process(env: &local_env::LocalEnv) -> anyhow::Result<()> {
let etcd_broker = &env.etcd_broker; let etcd_broker = &env.etcd_broker;
println!( println!(
"Starting etcd broker using {}", "Starting etcd broker using {:?}",
etcd_broker.etcd_binary_path.display() etcd_broker.etcd_binary_path
); );
let etcd_data_dir = env.base_data_dir.join("etcd"); let etcd_data_dir = env.base_data_dir.join("etcd");
fs::create_dir_all(&etcd_data_dir).with_context(|| { fs::create_dir_all(&etcd_data_dir)
format!( .with_context(|| format!("Failed to create etcd data dir {etcd_data_dir:?}"))?;
"Failed to create etcd data dir: {}",
etcd_data_dir.display()
)
})?;
let etcd_stdout_file =
fs::File::create(etcd_data_dir.join("etcd.stdout.log")).with_context(|| {
format!(
"Failed to create etcd stout file in directory {}",
etcd_data_dir.display()
)
})?;
let etcd_stderr_file =
fs::File::create(etcd_data_dir.join("etcd.stderr.log")).with_context(|| {
format!(
"Failed to create etcd stderr file in directory {}",
etcd_data_dir.display()
)
})?;
let client_urls = etcd_broker.comma_separated_endpoints(); let client_urls = etcd_broker.comma_separated_endpoints();
let args = [
let etcd_process = Command::new(&etcd_broker.etcd_binary_path)
.args(&[
format!("--data-dir={}", etcd_data_dir.display()), format!("--data-dir={}", etcd_data_dir.display()),
format!("--listen-client-urls={client_urls}"), format!("--listen-client-urls={client_urls}"),
format!("--advertise-client-urls={client_urls}"), format!("--advertise-client-urls={client_urls}"),
@@ -56,44 +28,48 @@ pub fn start_etcd_process(env: &local_env::LocalEnv) -> anyhow::Result<()> {
// enable it to prevent space exhaustion. // enable it to prevent space exhaustion.
"--auto-compaction-mode=revision".to_string(), "--auto-compaction-mode=revision".to_string(),
"--auto-compaction-retention=1".to_string(), "--auto-compaction-retention=1".to_string(),
]) ];
.stdout(Stdio::from(etcd_stdout_file))
.stderr(Stdio::from(etcd_stderr_file))
.spawn()
.context("Failed to spawn etcd subprocess")?;
let pid = etcd_process.id();
let etcd_pid_file_path = etcd_pid_file_path(env); let pid_file_path = etcd_pid_file_path(env);
fs::write(&etcd_pid_file_path, pid.to_string()).with_context(|| {
let client = reqwest::blocking::Client::new();
background_process::start_process(
"etcd",
&etcd_data_dir,
&etcd_broker.etcd_binary_path,
&args,
background_process::InitialPidFile::Create(&pid_file_path),
|| {
for broker_endpoint in &etcd_broker.broker_endpoints {
let request = broker_endpoint
.join("health")
.with_context(|| {
format!( format!(
"Failed to create etcd pid file at {}", "Failed to append /health path to broker endopint {}",
etcd_pid_file_path.display() broker_endpoint
) )
})
.and_then(|url| {
client.get(&url.to_string()).build().with_context(|| {
format!("Failed to construct request to etcd endpoint {url}")
})
})?; })?;
if client.execute(request).is_ok() {
return Ok(true);
}
}
Ok(false)
},
)
.context("Failed to spawn etcd subprocess")?;
Ok(()) Ok(())
} }
pub fn stop_etcd_process(env: &local_env::LocalEnv) -> anyhow::Result<()> { pub fn stop_etcd_process(env: &local_env::LocalEnv) -> anyhow::Result<()> {
let etcd_path = &env.etcd_broker.etcd_binary_path; background_process::stop_process(true, "etcd", &etcd_pid_file_path(env))
println!("Stopping etcd broker at {}", etcd_path.display());
let etcd_pid_file_path = etcd_pid_file_path(env);
let pid = Pid::from_raw(read_pidfile(&etcd_pid_file_path).with_context(|| {
format!(
"Failed to read etcd pid file at {}",
etcd_pid_file_path.display()
)
})?);
kill(pid, Signal::SIGTERM).with_context(|| {
format!(
"Failed to stop etcd with pid {pid} at {}",
etcd_pid_file_path.display()
)
})?;
Ok(())
} }
fn etcd_pid_file_path(env: &local_env::LocalEnv) -> PathBuf { fn etcd_pid_file_path(env: &local_env::LocalEnv) -> PathBuf {

View File

@@ -6,60 +6,12 @@
// Intended to be used in integration tests and in CLI tools for // Intended to be used in integration tests and in CLI tools for
// local installations. // local installations.
// //
use anyhow::{anyhow, bail, Context, Result};
use std::fs;
use std::path::Path;
use std::process::Command;
mod background_process;
pub mod compute; pub mod compute;
pub mod connection; pub mod connection;
pub mod etcd; pub mod etcd;
pub mod local_env; pub mod local_env;
pub mod pageserver;
pub mod postgresql_conf; pub mod postgresql_conf;
pub mod safekeeper; pub mod safekeeper;
pub mod storage;
/// Read a PID file
///
/// We expect a file that contains a single integer.
/// We return an i32 for compatibility with libc and nix.
pub fn read_pidfile(pidfile: &Path) -> Result<i32> {
let pid_str = fs::read_to_string(pidfile)
.with_context(|| format!("failed to read pidfile {:?}", pidfile))?;
let pid: i32 = pid_str
.parse()
.map_err(|_| anyhow!("failed to parse pidfile {:?}", pidfile))?;
if pid < 1 {
bail!("pidfile {:?} contained bad value '{}'", pidfile, pid);
}
Ok(pid)
}
fn fill_rust_env_vars(cmd: &mut Command) -> &mut Command {
let cmd = cmd.env_clear().env("RUST_BACKTRACE", "1");
let var = "LLVM_PROFILE_FILE";
if let Some(val) = std::env::var_os(var) {
cmd.env(var, val);
}
const RUST_LOG_KEY: &str = "RUST_LOG";
if let Ok(rust_log_value) = std::env::var(RUST_LOG_KEY) {
cmd.env(RUST_LOG_KEY, rust_log_value)
} else {
cmd
}
}
fn fill_aws_secrets_vars(mut cmd: &mut Command) -> &mut Command {
for env_key in [
"AWS_ACCESS_KEY_ID",
"AWS_SECRET_ACCESS_KEY",
"AWS_SESSION_TOKEN",
] {
if let Ok(value) = std::env::var(env_key) {
cmd = cmd.env(env_key, value);
}
}
cmd
}

View File

@@ -226,12 +226,12 @@ impl LocalEnv {
} }
} }
pub fn pageserver_bin(&self) -> anyhow::Result<PathBuf> { pub fn pageserver_bin(&self) -> PathBuf {
Ok(self.neon_distrib_dir.join("pageserver")) self.neon_distrib_dir.join("pageserver")
} }
pub fn safekeeper_bin(&self) -> anyhow::Result<PathBuf> { pub fn safekeeper_bin(&self) -> PathBuf {
Ok(self.neon_distrib_dir.join("safekeeper")) self.neon_distrib_dir.join("safekeeper")
} }
pub fn pg_data_dirs_path(&self) -> PathBuf { pub fn pg_data_dirs_path(&self) -> PathBuf {

View File

@@ -1,17 +1,13 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::fs::File; use std::fs::{self, File};
use std::io::{BufReader, Write}; use std::io::{BufReader, Write};
use std::num::NonZeroU64; use std::num::NonZeroU64;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::process::Command; use std::process::Child;
use std::time::Duration; use std::{io, result};
use std::{io, result, thread};
use crate::connection::PgConnectionConfig; use crate::connection::PgConnectionConfig;
use anyhow::{bail, Context}; use anyhow::{bail, Context};
use nix::errno::Errno;
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use pageserver_api::models::{ use pageserver_api::models::{
TenantConfigRequest, TenantCreateRequest, TenantInfo, TimelineCreateRequest, TimelineInfo, TenantConfigRequest, TenantCreateRequest, TenantInfo, TimelineCreateRequest, TimelineInfo,
}; };
@@ -25,8 +21,7 @@ use utils::{
postgres_backend::AuthType, postgres_backend::AuthType,
}; };
use crate::local_env::LocalEnv; use crate::{background_process, local_env::LocalEnv};
use crate::{fill_aws_secrets_vars, fill_rust_env_vars, read_pidfile};
#[derive(Error, Debug)] #[derive(Error, Debug)]
pub enum PageserverHttpError { pub enum PageserverHttpError {
@@ -160,7 +155,15 @@ impl PageServerNode {
init_config_overrides.push("auth_validation_public_key_path='auth_public_key.pem'"); init_config_overrides.push("auth_validation_public_key_path='auth_public_key.pem'");
} }
self.start_node(&init_config_overrides, &self.env.base_data_dir, true)?; let mut pageserver_process = self
.start_node(&init_config_overrides, &self.env.base_data_dir, true)
.with_context(|| {
format!(
"Failed to start a process for pageserver {}",
self.env.pageserver.id,
)
})?;
let init_result = self let init_result = self
.try_init_timeline(create_tenant, initial_timeline_id, pg_version) .try_init_timeline(create_tenant, initial_timeline_id, pg_version)
.context("Failed to create initial tenant and timeline for pageserver"); .context("Failed to create initial tenant and timeline for pageserver");
@@ -170,7 +173,29 @@ impl PageServerNode {
} }
Err(e) => eprintln!("{e:#}"), Err(e) => eprintln!("{e:#}"),
} }
self.stop(false)?; match pageserver_process.kill() {
Err(e) => {
eprintln!(
"Failed to stop pageserver {} process with pid {}: {e:#}",
self.env.pageserver.id,
pageserver_process.id(),
)
}
Ok(()) => {
println!(
"Stopped pageserver {} process with pid {}",
self.env.pageserver.id,
pageserver_process.id(),
);
// cleanup after pageserver startup, since we do not call regular `stop_process` during init
let pid_file = self.pid_file();
if let Err(e) = fs::remove_file(&pid_file) {
if e.kind() != io::ErrorKind::NotFound {
eprintln!("Failed to remove pid file {pid_file:?} after stopping the process: {e:#}");
}
}
}
}
init_result init_result
} }
@@ -195,11 +220,14 @@ impl PageServerNode {
self.env.pageserver_data_dir() self.env.pageserver_data_dir()
} }
pub fn pid_file(&self) -> PathBuf { /// The pid file is created by the pageserver process, with its pid stored inside.
/// Other pageservers cannot lock the same file and overwrite it for as long as the current
/// pageserver runs. (Unless someone removes the file manually; never do that!)
fn pid_file(&self) -> PathBuf {
self.repo_path().join("pageserver.pid") self.repo_path().join("pageserver.pid")
} }
pub fn start(&self, config_overrides: &[&str]) -> anyhow::Result<()> { pub fn start(&self, config_overrides: &[&str]) -> anyhow::Result<Child> {
self.start_node(config_overrides, &self.repo_path(), false) self.start_node(config_overrides, &self.repo_path(), false)
} }
@@ -208,7 +236,7 @@ impl PageServerNode {
config_overrides: &[&str], config_overrides: &[&str],
datadir: &Path, datadir: &Path,
update_config: bool, update_config: bool,
) -> anyhow::Result<()> { ) -> anyhow::Result<Child> {
println!( println!(
"Starting pageserver at '{}' in '{}'", "Starting pageserver at '{}' in '{}'",
self.pg_connection_config.raw_address(), self.pg_connection_config.raw_address(),
@@ -219,10 +247,7 @@ impl PageServerNode {
let mut args = vec![ let mut args = vec![
"-D", "-D",
datadir.to_str().with_context(|| { datadir.to_str().with_context(|| {
format!( format!("Datadir path {datadir:?} cannot be represented as a unicode string")
"Datadir path '{}' cannot be represented as a unicode string",
datadir.display()
)
})?, })?,
]; ];
@@ -234,48 +259,18 @@ impl PageServerNode {
args.extend(["-c", config_override]); args.extend(["-c", config_override]);
} }
let mut cmd = Command::new(self.env.pageserver_bin()?); background_process::start_process(
let mut filled_cmd = fill_rust_env_vars(cmd.args(&args).arg("--daemonize")); "pageserver",
filled_cmd = fill_aws_secrets_vars(filled_cmd); datadir,
&self.env.pageserver_bin(),
if !filled_cmd.status()?.success() { &args,
bail!( background_process::InitialPidFile::Expect(&self.pid_file()),
"Pageserver failed to start. See console output and '{}' for details.", || match self.check_status() {
datadir.join("pageserver.log").display() Ok(()) => Ok(true),
); Err(PageserverHttpError::Transport(_)) => Ok(false),
} Err(e) => Err(anyhow::anyhow!("Failed to check node status: {e}")),
},
// It takes a while for the page server to start up. Wait until it is )
// open for business.
const RETRIES: i8 = 15;
for retries in 1..RETRIES {
match self.check_status() {
Ok(()) => {
println!("\nPageserver started");
return Ok(());
}
Err(err) => {
match err {
PageserverHttpError::Transport(err) => {
if err.is_connect() && retries < 5 {
print!(".");
io::stdout().flush().unwrap();
} else {
if retries == 5 {
println!() // put a line break after dots for second message
}
println!("Pageserver not responding yet, err {err} retrying ({retries})...");
}
}
PageserverHttpError::Response(msg) => {
bail!("pageserver failed to start: {msg} ")
}
}
thread::sleep(Duration::from_secs(1));
}
}
}
bail!("pageserver failed to start in {RETRIES} seconds");
} }
/// ///
@@ -287,58 +282,7 @@ impl PageServerNode {
/// If the server is not running, returns success /// If the server is not running, returns success
/// ///
pub fn stop(&self, immediate: bool) -> anyhow::Result<()> { pub fn stop(&self, immediate: bool) -> anyhow::Result<()> {
let pid_file = self.pid_file(); background_process::stop_process(immediate, "pageserver", &self.pid_file())
if !pid_file.exists() {
println!("Pageserver is already stopped");
return Ok(());
}
let pid = Pid::from_raw(read_pidfile(&pid_file)?);
let sig = if immediate {
print!("Stopping pageserver immediately..");
Signal::SIGQUIT
} else {
print!("Stopping pageserver gracefully..");
Signal::SIGTERM
};
io::stdout().flush().unwrap();
match kill(pid, sig) {
Ok(_) => (),
Err(Errno::ESRCH) => {
println!("Pageserver with pid {pid} does not exist, but a PID file was found");
return Ok(());
}
Err(err) => bail!(
"Failed to send signal to pageserver with pid {pid}: {}",
err.desc()
),
}
// Wait until process is gone
for i in 0..600 {
let signal = None; // Send no signal, just get the error code
match kill(pid, signal) {
Ok(_) => (), // Process exists, keep waiting
Err(Errno::ESRCH) => {
// Process not found, we're done
println!("done!");
return Ok(());
}
Err(err) => bail!(
"Failed to send signal to pageserver with pid {}: {}",
pid,
err.desc()
),
};
if i % 10 == 0 {
print!(".");
io::stdout().flush().unwrap();
}
thread::sleep(Duration::from_millis(100));
}
bail!("Failed to stop pageserver with pid {pid}");
} }
pub fn page_server_psql(&self, sql: &str) -> Vec<postgres::SimpleQueryMessage> { pub fn page_server_psql(&self, sql: &str) -> Vec<postgres::SimpleQueryMessage> {

View File

@@ -1,23 +1,21 @@
use std::io::Write; use std::io::Write;
use std::path::PathBuf; use std::path::PathBuf;
use std::process::Command; use std::process::Child;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::{io, result};
use std::{io, result, thread};
use anyhow::bail; use anyhow::Context;
use nix::errno::Errno;
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use reqwest::blocking::{Client, RequestBuilder, Response}; use reqwest::blocking::{Client, RequestBuilder, Response};
use reqwest::{IntoUrl, Method}; use reqwest::{IntoUrl, Method};
use thiserror::Error; use thiserror::Error;
use utils::{http::error::HttpErrorBody, id::NodeId}; use utils::{http::error::HttpErrorBody, id::NodeId};
use crate::connection::PgConnectionConfig; use crate::connection::PgConnectionConfig;
use crate::local_env::{LocalEnv, SafekeeperConf}; use crate::pageserver::PageServerNode;
use crate::storage::PageServerNode; use crate::{
use crate::{fill_aws_secrets_vars, fill_rust_env_vars, read_pidfile}; background_process,
local_env::{LocalEnv, SafekeeperConf},
};
#[derive(Error, Debug)] #[derive(Error, Debug)]
pub enum SafekeeperHttpError { pub enum SafekeeperHttpError {
@@ -95,7 +93,7 @@ impl SafekeeperNode {
} }
pub fn datadir_path_by_id(env: &LocalEnv, sk_id: NodeId) -> PathBuf { pub fn datadir_path_by_id(env: &LocalEnv, sk_id: NodeId) -> PathBuf {
env.safekeeper_data_dir(format!("sk{}", sk_id).as_ref()) env.safekeeper_data_dir(&format!("sk{sk_id}"))
} }
pub fn datadir_path(&self) -> PathBuf { pub fn datadir_path(&self) -> PathBuf {
@@ -106,7 +104,7 @@ impl SafekeeperNode {
self.datadir_path().join("safekeeper.pid") self.datadir_path().join("safekeeper.pid")
} }
pub fn start(&self) -> anyhow::Result<()> { pub fn start(&self) -> anyhow::Result<Child> {
print!( print!(
"Starting safekeeper at '{}' in '{}'", "Starting safekeeper at '{}' in '{}'",
self.pg_connection_config.raw_address(), self.pg_connection_config.raw_address(),
@@ -116,81 +114,68 @@ impl SafekeeperNode {
let listen_pg = format!("127.0.0.1:{}", self.conf.pg_port); let listen_pg = format!("127.0.0.1:{}", self.conf.pg_port);
let listen_http = format!("127.0.0.1:{}", self.conf.http_port); let listen_http = format!("127.0.0.1:{}", self.conf.http_port);
let id = self.id;
let datadir = self.datadir_path();
let mut cmd = Command::new(self.env.safekeeper_bin()?); let id_string = id.to_string();
fill_rust_env_vars( let mut args = vec![
cmd.args(&["-D", self.datadir_path().to_str().unwrap()]) "-D",
.args(&["--id", self.id.to_string().as_ref()]) datadir.to_str().with_context(|| {
.args(&["--listen-pg", &listen_pg]) format!("Datadir path {datadir:?} cannot be represented as a unicode string")
.args(&["--listen-http", &listen_http]) })?,
.arg("--daemonize"), "--id",
); &id_string,
"--listen-pg",
&listen_pg,
"--listen-http",
&listen_http,
];
if !self.conf.sync { if !self.conf.sync {
cmd.arg("--no-sync"); args.push("--no-sync");
} }
let comma_separated_endpoints = self.env.etcd_broker.comma_separated_endpoints(); let comma_separated_endpoints = self.env.etcd_broker.comma_separated_endpoints();
if !comma_separated_endpoints.is_empty() { if !comma_separated_endpoints.is_empty() {
cmd.args(&["--broker-endpoints", &comma_separated_endpoints]); args.extend(["--broker-endpoints", &comma_separated_endpoints]);
} }
if let Some(prefix) = self.env.etcd_broker.broker_etcd_prefix.as_deref() { if let Some(prefix) = self.env.etcd_broker.broker_etcd_prefix.as_deref() {
cmd.args(&["--broker-etcd-prefix", prefix]); args.extend(["--broker-etcd-prefix", prefix]);
} }
let mut backup_threads = String::new();
if let Some(threads) = self.conf.backup_threads { if let Some(threads) = self.conf.backup_threads {
cmd.args(&["--backup-threads", threads.to_string().as_ref()]); backup_threads = threads.to_string();
} args.extend(["--backup-threads", &backup_threads]);
if let Some(ref remote_storage) = self.conf.remote_storage {
cmd.args(&["--remote-storage", remote_storage]);
}
if self.conf.auth_enabled {
cmd.arg("--auth-validation-public-key-path");
// PathBuf is better be passed as is, not via `String`.
cmd.arg(self.env.base_data_dir.join("auth_public_key.pem"));
}
fill_aws_secrets_vars(&mut cmd);
if !cmd.status()?.success() {
bail!(
"Safekeeper failed to start. See '{}' for details.",
self.datadir_path().join("safekeeper.log").display()
);
}
// It takes a while for the safekeeper to start up. Wait until it is
// open for business.
const RETRIES: i8 = 15;
for retries in 1..RETRIES {
match self.check_status() {
Ok(_) => {
println!("\nSafekeeper started");
return Ok(());
}
Err(err) => {
match err {
SafekeeperHttpError::Transport(err) => {
if err.is_connect() && retries < 5 {
print!(".");
io::stdout().flush().unwrap();
} else { } else {
if retries == 5 { drop(backup_threads);
println!() // put a line break after dots for second message
} }
println!(
"Safekeeper not responding yet, err {} retrying ({})...", if let Some(ref remote_storage) = self.conf.remote_storage {
err, retries args.extend(["--remote-storage", remote_storage]);
);
} }
let key_path = self.env.base_data_dir.join("auth_public_key.pem");
if self.conf.auth_enabled {
args.extend([
"--auth-validation-public-key-path",
key_path.to_str().with_context(|| {
format!("Key path {key_path:?} cannot be represented as a unicode string")
})?,
]);
} }
SafekeeperHttpError::Response(msg) => {
bail!("safekeeper failed to start: {} ", msg) background_process::start_process(
} &format!("safekeeper {id}"),
} &datadir,
thread::sleep(Duration::from_secs(1)); &self.env.safekeeper_bin(),
} &args,
} background_process::InitialPidFile::Expect(&self.pid_file()),
} || match self.check_status() {
bail!("safekeeper failed to start in {} seconds", RETRIES); Ok(()) => Ok(true),
Err(SafekeeperHttpError::Transport(_)) => Ok(false),
Err(e) => Err(anyhow::anyhow!("Failed to check node status: {e}")),
},
)
} }
/// ///
@@ -202,63 +187,11 @@ impl SafekeeperNode {
/// If the server is not running, returns success /// If the server is not running, returns success
/// ///
pub fn stop(&self, immediate: bool) -> anyhow::Result<()> { pub fn stop(&self, immediate: bool) -> anyhow::Result<()> {
let pid_file = self.pid_file(); background_process::stop_process(
if !pid_file.exists() { immediate,
println!("Safekeeper {} is already stopped", self.id); &format!("safekeeper {}", self.id),
return Ok(()); &self.pid_file(),
} )
let pid = read_pidfile(&pid_file)?;
let pid = Pid::from_raw(pid);
let sig = if immediate {
print!("Stopping safekeeper {} immediately..", self.id);
Signal::SIGQUIT
} else {
print!("Stopping safekeeper {} gracefully..", self.id);
Signal::SIGTERM
};
io::stdout().flush().unwrap();
match kill(pid, sig) {
Ok(_) => (),
Err(Errno::ESRCH) => {
println!(
"Safekeeper with pid {} does not exist, but a PID file was found",
pid
);
return Ok(());
}
Err(err) => bail!(
"Failed to send signal to safekeeper with pid {}: {}",
pid,
err.desc()
),
}
// Wait until process is gone
for i in 0..600 {
let signal = None; // Send no signal, just get the error code
match kill(pid, signal) {
Ok(_) => (), // Process exists, keep waiting
Err(Errno::ESRCH) => {
// Process not found, we're done
println!("done!");
return Ok(());
}
Err(err) => bail!(
"Failed to send signal to pageserver with pid {}: {}",
pid,
err.desc()
),
};
if i % 10 == 0 {
print!(".");
io::stdout().flush().unwrap();
}
thread::sleep(Duration::from_millis(100));
}
bail!("Failed to stop safekeeper with pid {}", pid);
} }
fn http_request<U: IntoUrl>(&self, method: Method, url: U) -> RequestBuilder { fn http_request<U: IntoUrl>(&self, method: Method, url: U) -> RequestBuilder {

View File

@@ -36,6 +36,8 @@ pub mod sock_split;
// common log initialisation routine // common log initialisation routine
pub mod logging; pub mod logging;
pub mod lock_file;
// Misc // Misc
pub mod accum; pub mod accum;
pub mod shutdown; pub mod shutdown;

View File

@@ -0,0 +1,81 @@
//! A module to create and read lock files. A lock file ensures that only one
//! process is running at a time, in a particular directory.
//!
//! File locking is done using [`fcntl::flock`], which means that holding the
//! lock on file only prevents acquiring another lock on it; all other
//! operations are still possible on files. Other process can still open, read,
//! write, or remove the file, for example.
//! If the file is removed while a process is holding a lock on it,
//! the process that holds the lock does not get any error or notification.
//! Furthermore, you can create a new file with the same name and lock the new file,
//! while the old process is still running.
//! Deleting the lock file while the locking process is still running is a bad idea!
use std::{fs, os::unix::prelude::AsRawFd, path::Path};
use anyhow::Context;
use nix::fcntl;
use crate::crashsafe;
pub enum LockCreationResult {
Created {
new_lock_contents: String,
file: fs::File,
},
AlreadyLocked {
existing_lock_contents: String,
},
CreationFailed(anyhow::Error),
}
/// Creates a lock file in the path given and writes the given contents into the file.
/// Note: The lock is automatically released when the file closed. You might want to use Box::leak to make sure it lives until the end of the program.
pub fn create_lock_file(lock_file_path: &Path, contents: String) -> LockCreationResult {
let lock_file = match fs::OpenOptions::new()
.create(true) // O_CREAT
.write(true)
.open(lock_file_path)
.context("Failed to open lock file")
{
Ok(file) => file,
Err(e) => return LockCreationResult::CreationFailed(e),
};
match fcntl::flock(
lock_file.as_raw_fd(),
fcntl::FlockArg::LockExclusiveNonblock,
) {
Ok(()) => {
match lock_file
.set_len(0)
.context("Failed to truncate lockfile")
.and_then(|()| {
fs::write(lock_file_path, &contents).with_context(|| {
format!("Failed to write '{contents}' contents into lockfile")
})
})
.and_then(|()| {
crashsafe::fsync_file_and_parent(lock_file_path)
.context("Failed to fsync lockfile")
}) {
Ok(()) => LockCreationResult::Created {
new_lock_contents: contents,
file: lock_file,
},
Err(e) => LockCreationResult::CreationFailed(e),
}
}
Err(nix::errno::Errno::EAGAIN) => {
match fs::read_to_string(lock_file_path).context("Failed to read lockfile contents") {
Ok(existing_lock_contents) => LockCreationResult::AlreadyLocked {
existing_lock_contents,
},
Err(e) => LockCreationResult::CreationFailed(e),
}
}
Err(e) => {
LockCreationResult::CreationFailed(anyhow::anyhow!("Failed to lock lockfile: {e}"))
}
}
}

View File

@@ -1,10 +1,6 @@
use std::{ use std::str::FromStr;
fs::{File, OpenOptions},
path::Path,
str::FromStr,
};
use anyhow::{Context, Result}; use anyhow::Context;
use strum_macros::{EnumString, EnumVariantNames}; use strum_macros::{EnumString, EnumVariantNames};
#[derive(EnumString, EnumVariantNames, Eq, PartialEq, Debug, Clone, Copy)] #[derive(EnumString, EnumVariantNames, Eq, PartialEq, Debug, Clone, Copy)]
@@ -25,19 +21,8 @@ impl LogFormat {
}) })
} }
} }
pub fn init(
log_filename: impl AsRef<Path>,
daemonize: bool,
log_format: LogFormat,
) -> Result<File> {
// Don't open the same file for output multiple times;
// the different fds could overwrite each other's output.
let log_file = OpenOptions::new()
.create(true)
.append(true)
.open(&log_filename)
.with_context(|| format!("failed to open {:?}", log_filename.as_ref()))?;
pub fn init(log_format: LogFormat) -> anyhow::Result<()> {
let default_filter_str = "info"; let default_filter_str = "info";
// We fall back to printing all spans at info-level or above if // We fall back to printing all spans at info-level or above if
@@ -45,50 +30,16 @@ pub fn init(
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env() let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(default_filter_str)); .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(default_filter_str));
let x: File = log_file.try_clone().unwrap();
let base_logger = tracing_subscriber::fmt() let base_logger = tracing_subscriber::fmt()
.with_env_filter(env_filter) .with_env_filter(env_filter)
.with_target(false) .with_target(false)
.with_ansi(false) .with_ansi(false)
.with_writer(move || -> Box<dyn std::io::Write> { .with_writer(std::io::stdout);
// we are cloning and returning log file in order to allow redirecting daemonized stdout and stderr to it
// if we do not use daemonization (e.g. in docker) it is better to log to stdout directly
// for example to be in line with docker log command which expects logs comimg from stdout
if daemonize {
Box::new(x.try_clone().unwrap())
} else {
Box::new(std::io::stdout())
}
});
match log_format { match log_format {
LogFormat::Json => base_logger.json().init(), LogFormat::Json => base_logger.json().init(),
LogFormat::Plain => base_logger.init(), LogFormat::Plain => base_logger.init(),
} }
Ok(log_file) Ok(())
}
// #[cfg(test)]
// Due to global logger, can't run tests in same process.
// So until there's a non-global one, the tests are in ../tests/ as separate files.
#[macro_export(local_inner_macros)]
macro_rules! test_init_file_logger {
($log_level:expr, $log_format:expr) => {{
use std::str::FromStr;
std::env::set_var("RUST_LOG", $log_level);
let tmp_dir = tempfile::TempDir::new().unwrap();
let log_file_path = tmp_dir.path().join("logfile");
let log_format = $crate::logging::LogFormat::from_str($log_format).unwrap();
let _log_file = $crate::logging::init(&log_file_path, true, log_format).unwrap();
let log_file = std::fs::OpenOptions::new()
.read(true)
.open(&log_file_path)
.unwrap();
log_file
}};
} }

View File

@@ -1,36 +0,0 @@
// This could be in ../src/logging.rs but since the logger is global, these
// can't be run in threads of the same process
use std::fs::File;
use std::io::{BufRead, BufReader, Lines};
use tracing::*;
use utils::test_init_file_logger;
fn read_lines(file: File) -> Lines<BufReader<File>> {
BufReader::new(file).lines()
}
#[test]
fn test_json_format_has_message_and_custom_field() {
std::env::set_var("RUST_LOG", "info");
let log_file = test_init_file_logger!("info", "json");
let custom_field: &str = "hi";
trace!(custom = %custom_field, "test log message");
debug!(custom = %custom_field, "test log message");
info!(custom = %custom_field, "test log message");
warn!(custom = %custom_field, "test log message");
error!(custom = %custom_field, "test log message");
let lines = read_lines(log_file);
for line in lines {
let content = line.unwrap();
let json_object = serde_json::from_str::<serde_json::Value>(&content).unwrap();
assert_eq!(json_object["fields"]["custom"], "hi");
assert_eq!(json_object["fields"]["message"], "test log message");
assert_ne!(json_object["level"], "TRACE");
assert_ne!(json_object["level"], "DEBUG");
}
}

View File

@@ -1,36 +0,0 @@
// This could be in ../src/logging.rs but since the logger is global, these
// can't be run in threads of the same process
use std::fs::File;
use std::io::{BufRead, BufReader, Lines};
use tracing::*;
use utils::test_init_file_logger;
fn read_lines(file: File) -> Lines<BufReader<File>> {
BufReader::new(file).lines()
}
#[test]
fn test_plain_format_has_message_and_custom_field() {
std::env::set_var("RUST_LOG", "warn");
let log_file = test_init_file_logger!("warn", "plain");
let custom_field: &str = "hi";
trace!(custom = %custom_field, "test log message");
debug!(custom = %custom_field, "test log message");
info!(custom = %custom_field, "test log message");
warn!(custom = %custom_field, "test log message");
error!(custom = %custom_field, "test log message");
let lines = read_lines(log_file);
for line in lines {
let content = line.unwrap();
serde_json::from_str::<serde_json::Value>(&content).unwrap_err();
assert!(content.contains("custom=hi"));
assert!(content.contains("test log message"));
assert!(!content.contains("TRACE"));
assert!(!content.contains("DEBUG"));
assert!(!content.contains("INFO"));
}
}

View File

@@ -24,7 +24,6 @@ hex = "0.4.3"
hyper = "0.14" hyper = "0.14"
itertools = "0.10.3" itertools = "0.10.3"
clap = { version = "4.0", features = ["string"] } clap = { version = "4.0", features = ["string"] }
daemonize = "0.4.1"
tokio = { version = "1.17", features = ["process", "sync", "macros", "fs", "rt", "io-util", "time"] } tokio = { version = "1.17", features = ["process", "sync", "macros", "fs", "rt", "io-util", "time"] }
tokio-util = { version = "0.7.3", features = ["io", "io-util"] } tokio-util = { version = "0.7.3", features = ["io", "io-util"] }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }

View File

@@ -1,17 +1,14 @@
//! Main entry point for the Page Server executable. //! Main entry point for the Page Server executable.
use remote_storage::GenericRemoteStorage;
use std::{env, ops::ControlFlow, path::Path, str::FromStr}; use std::{env, ops::ControlFlow, path::Path, str::FromStr};
use anyhow::{anyhow, Context};
use clap::{Arg, ArgAction, Command};
use fail::FailScenario;
use nix::unistd::Pid;
use tracing::*; use tracing::*;
use anyhow::{anyhow, bail, Context, Result};
use clap::{Arg, ArgAction, Command};
use daemonize::Daemonize;
use fail::FailScenario;
use metrics::set_build_info_metric; use metrics::set_build_info_metric;
use pageserver::{ use pageserver::{
config::{defaults::*, PageServerConf}, config::{defaults::*, PageServerConf},
http, page_cache, page_service, profiling, task_mgr, http, page_cache, page_service, profiling, task_mgr,
@@ -19,20 +16,22 @@ use pageserver::{
task_mgr::{ task_mgr::{
BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME, WALRECEIVER_RUNTIME, BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME, WALRECEIVER_RUNTIME,
}, },
tenant_mgr, virtual_file, LOG_FILE_NAME, tenant_mgr, virtual_file,
}; };
use remote_storage::GenericRemoteStorage;
use utils::{ use utils::{
auth::JwtAuth, auth::JwtAuth,
logging, lock_file, logging,
postgres_backend::AuthType, postgres_backend::AuthType,
project_git_version, project_git_version,
shutdown::exit_now,
signals::{self, Signal}, signals::{self, Signal},
tcp_listener, tcp_listener,
}; };
project_git_version!(GIT_VERSION); project_git_version!(GIT_VERSION);
const PID_FILE_NAME: &str = "pageserver.pid";
const FEATURES: &[&str] = &[ const FEATURES: &[&str] = &[
#[cfg(feature = "testing")] #[cfg(feature = "testing")]
"testing", "testing",
@@ -65,6 +64,7 @@ fn main() -> anyhow::Result<()> {
let workdir = workdir let workdir = workdir
.canonicalize() .canonicalize()
.with_context(|| format!("Error opening workdir '{}'", workdir.display()))?; .with_context(|| format!("Error opening workdir '{}'", workdir.display()))?;
let cfg_file_path = workdir.join("pageserver.toml"); let cfg_file_path = workdir.join("pageserver.toml");
// Set CWD to workdir for non-daemon modes // Set CWD to workdir for non-daemon modes
@@ -75,8 +75,6 @@ fn main() -> anyhow::Result<()> {
) )
})?; })?;
let daemonize = arg_matches.get_flag("daemonize");
let conf = match initialize_config(&cfg_file_path, arg_matches, &workdir)? { let conf = match initialize_config(&cfg_file_path, arg_matches, &workdir)? {
ControlFlow::Continue(conf) => conf, ControlFlow::Continue(conf) => conf,
ControlFlow::Break(()) => { ControlFlow::Break(()) => {
@@ -102,7 +100,7 @@ fn main() -> anyhow::Result<()> {
virtual_file::init(conf.max_file_descriptors); virtual_file::init(conf.max_file_descriptors);
page_cache::init(conf.page_cache_size); page_cache::init(conf.page_cache_size);
start_pageserver(conf, daemonize).context("Failed to start pageserver")?; start_pageserver(conf).context("Failed to start pageserver")?;
scenario.teardown(); scenario.teardown();
Ok(()) Ok(())
@@ -197,12 +195,34 @@ fn initialize_config(
}) })
} }
fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()> { fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> {
// Initialize logger logging::init(conf.log_format)?;
let log_file = logging::init(LOG_FILE_NAME, daemonize, conf.log_format)?;
info!("version: {}", version()); info!("version: {}", version());
let lock_file_path = conf.workdir.join(PID_FILE_NAME);
let lock_file = match lock_file::create_lock_file(&lock_file_path, Pid::this().to_string()) {
lock_file::LockCreationResult::Created {
new_lock_contents,
file,
} => {
info!("Created lock file at {lock_file_path:?} with contenst {new_lock_contents}");
file
}
lock_file::LockCreationResult::AlreadyLocked {
existing_lock_contents,
} => anyhow::bail!(
"Could not lock pid file; pageserver is already running in {:?} with PID {}",
conf.workdir,
existing_lock_contents
),
lock_file::LockCreationResult::CreationFailed(e) => {
return Err(e.context(format!("Failed to create lock file at {lock_file_path:?}")))
}
};
// ensure that the lock file is held even if the main thread of the process is panics
// we need to release the lock file only when the current process is gone
let _ = Box::leak(Box::new(lock_file));
// TODO: Check that it looks like a valid repository before going further // TODO: Check that it looks like a valid repository before going further
// bind sockets before daemonizing so we report errors early and do not return until we are listening // bind sockets before daemonizing so we report errors early and do not return until we are listening
@@ -218,33 +238,6 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
); );
let pageserver_listener = tcp_listener::bind(conf.listen_pg_addr.clone())?; let pageserver_listener = tcp_listener::bind(conf.listen_pg_addr.clone())?;
// NB: Don't spawn any threads before daemonizing!
if daemonize {
info!("daemonizing...");
// There shouldn't be any logging to stdin/stdout. Redirect it to the main log so
// that we will see any accidental manual fprintf's or backtraces.
let stdout = log_file
.try_clone()
.with_context(|| format!("Failed to clone log file '{:?}'", log_file))?;
let stderr = log_file;
let daemonize = Daemonize::new()
.pid_file("pageserver.pid")
.working_directory(".")
.stdout(stdout)
.stderr(stderr);
// XXX: The parent process should exit abruptly right after
// it has spawned a child to prevent coverage machinery from
// dumping stats into a `profraw` file now owned by the child.
// Otherwise, the coverage data will be damaged.
match daemonize.exit_action(|| exit_now(0)).start() {
Ok(_) => info!("Success, daemonized"),
Err(err) => bail!("{err}. could not daemonize. bailing."),
}
}
let signals = signals::install_shutdown_handlers()?; let signals = signals::install_shutdown_handlers()?;
// start profiler (if enabled) // start profiler (if enabled)
@@ -347,14 +340,6 @@ fn cli() -> Command {
Command::new("Neon page server") Command::new("Neon page server")
.about("Materializes WAL stream to pages and serves them to the postgres") .about("Materializes WAL stream to pages and serves them to the postgres")
.version(version()) .version(version())
.arg(
Arg::new("daemonize")
.short('d')
.long("daemonize")
.action(ArgAction::SetTrue)
.help("Run in the background"),
)
.arg( .arg(
Arg::new("init") Arg::new("init")
.long("init") .long("init")

View File

@@ -43,8 +43,6 @@ pub const DEFAULT_PG_VERSION: u32 = 14;
pub const IMAGE_FILE_MAGIC: u16 = 0x5A60; pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;
pub const DELTA_FILE_MAGIC: u16 = 0x5A61; pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
pub const LOG_FILE_NAME: &str = "pageserver.log";
static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]); static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
/// Config for the Repository checkpointer /// Config for the Repository checkpointer
@@ -81,7 +79,6 @@ pub async fn shutdown_pageserver(exit_code: i32) {
// There should be nothing left, but let's be sure // There should be nothing left, but let's be sure
task_mgr::shutdown_tasks(None, None, None).await; task_mgr::shutdown_tasks(None, None, None).await;
info!("Shut down successfully completed"); info!("Shut down successfully completed");
std::process::exit(exit_code); std::process::exit(exit_code);
} }

View File

@@ -671,10 +671,6 @@ impl PostgresRedoProcess {
// The Rust standard library makes sure to mark any file descriptors with // The Rust standard library makes sure to mark any file descriptors with
// as close-on-exec by default, but that's not enough, since we use // as close-on-exec by default, but that's not enough, since we use
// libraries that directly call libc open without setting that flag. // libraries that directly call libc open without setting that flag.
//
// One example is the pidfile of the daemonize library, which doesn't
// currently mark file descriptors as close-on-exec. Either way, we
// want to be on the safe side and prevent accidental regression.
.close_fds() .close_fds()
.spawn() .spawn()
.map_err(|e| { .map_err(|e| {

23
poetry.lock generated
View File

@@ -1568,7 +1568,7 @@ testing = ["func-timeout", "jaraco.itertools", "pytest (>=6)", "pytest-black (>=
[metadata] [metadata]
lock-version = "1.1" lock-version = "1.1"
python-versions = "^3.9" python-versions = "^3.9"
content-hash = "17cdbfe90f1b06dffaf24c3e076384ec08dd4a2dce5a05e50565f7364932eb2d" content-hash = "9352a89d49d34807f6a58f6c3f898acbd8cf3570e0f45ede973673644bde4d0e"
[metadata.files] [metadata.files]
aiopg = [ aiopg = [
@@ -1978,6 +1978,7 @@ prometheus-client = [
psycopg2-binary = [ psycopg2-binary = [
{file = "psycopg2-binary-2.9.3.tar.gz", hash = "sha256:761df5313dc15da1502b21453642d7599d26be88bff659382f8f9747c7ebea4e"}, {file = "psycopg2-binary-2.9.3.tar.gz", hash = "sha256:761df5313dc15da1502b21453642d7599d26be88bff659382f8f9747c7ebea4e"},
{file = "psycopg2_binary-2.9.3-cp310-cp310-macosx_10_14_x86_64.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl", hash = "sha256:539b28661b71da7c0e428692438efbcd048ca21ea81af618d845e06ebfd29478"}, {file = "psycopg2_binary-2.9.3-cp310-cp310-macosx_10_14_x86_64.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl", hash = "sha256:539b28661b71da7c0e428692438efbcd048ca21ea81af618d845e06ebfd29478"},
{file = "psycopg2_binary-2.9.3-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:2f2534ab7dc7e776a263b463a16e189eb30e85ec9bbe1bff9e78dae802608932"},
{file = "psycopg2_binary-2.9.3-cp310-cp310-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6e82d38390a03da28c7985b394ec3f56873174e2c88130e6966cb1c946508e65"}, {file = "psycopg2_binary-2.9.3-cp310-cp310-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6e82d38390a03da28c7985b394ec3f56873174e2c88130e6966cb1c946508e65"},
{file = "psycopg2_binary-2.9.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:57804fc02ca3ce0dbfbef35c4b3a4a774da66d66ea20f4bda601294ad2ea6092"}, {file = "psycopg2_binary-2.9.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:57804fc02ca3ce0dbfbef35c4b3a4a774da66d66ea20f4bda601294ad2ea6092"},
{file = "psycopg2_binary-2.9.3-cp310-cp310-manylinux_2_24_aarch64.whl", hash = "sha256:083a55275f09a62b8ca4902dd11f4b33075b743cf0d360419e2051a8a5d5ff76"}, {file = "psycopg2_binary-2.9.3-cp310-cp310-manylinux_2_24_aarch64.whl", hash = "sha256:083a55275f09a62b8ca4902dd11f4b33075b743cf0d360419e2051a8a5d5ff76"},
@@ -2011,6 +2012,7 @@ psycopg2-binary = [
{file = "psycopg2_binary-2.9.3-cp37-cp37m-win32.whl", hash = "sha256:adf20d9a67e0b6393eac162eb81fb10bc9130a80540f4df7e7355c2dd4af9fba"}, {file = "psycopg2_binary-2.9.3-cp37-cp37m-win32.whl", hash = "sha256:adf20d9a67e0b6393eac162eb81fb10bc9130a80540f4df7e7355c2dd4af9fba"},
{file = "psycopg2_binary-2.9.3-cp37-cp37m-win_amd64.whl", hash = "sha256:2f9ffd643bc7349eeb664eba8864d9e01f057880f510e4681ba40a6532f93c71"}, {file = "psycopg2_binary-2.9.3-cp37-cp37m-win_amd64.whl", hash = "sha256:2f9ffd643bc7349eeb664eba8864d9e01f057880f510e4681ba40a6532f93c71"},
{file = "psycopg2_binary-2.9.3-cp38-cp38-macosx_10_14_x86_64.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl", hash = "sha256:def68d7c21984b0f8218e8a15d514f714d96904265164f75f8d3a70f9c295667"}, {file = "psycopg2_binary-2.9.3-cp38-cp38-macosx_10_14_x86_64.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl", hash = "sha256:def68d7c21984b0f8218e8a15d514f714d96904265164f75f8d3a70f9c295667"},
{file = "psycopg2_binary-2.9.3-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:e6aa71ae45f952a2205377773e76f4e3f27951df38e69a4c95440c779e013560"},
{file = "psycopg2_binary-2.9.3-cp38-cp38-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:dffc08ca91c9ac09008870c9eb77b00a46b3378719584059c034b8945e26b272"}, {file = "psycopg2_binary-2.9.3-cp38-cp38-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:dffc08ca91c9ac09008870c9eb77b00a46b3378719584059c034b8945e26b272"},
{file = "psycopg2_binary-2.9.3-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:280b0bb5cbfe8039205c7981cceb006156a675362a00fe29b16fbc264e242834"}, {file = "psycopg2_binary-2.9.3-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:280b0bb5cbfe8039205c7981cceb006156a675362a00fe29b16fbc264e242834"},
{file = "psycopg2_binary-2.9.3-cp38-cp38-manylinux_2_24_aarch64.whl", hash = "sha256:af9813db73395fb1fc211bac696faea4ca9ef53f32dc0cfa27e4e7cf766dcf24"}, {file = "psycopg2_binary-2.9.3-cp38-cp38-manylinux_2_24_aarch64.whl", hash = "sha256:af9813db73395fb1fc211bac696faea4ca9ef53f32dc0cfa27e4e7cf766dcf24"},
@@ -2022,6 +2024,7 @@ psycopg2-binary = [
{file = "psycopg2_binary-2.9.3-cp38-cp38-win32.whl", hash = "sha256:6472a178e291b59e7f16ab49ec8b4f3bdada0a879c68d3817ff0963e722a82ce"}, {file = "psycopg2_binary-2.9.3-cp38-cp38-win32.whl", hash = "sha256:6472a178e291b59e7f16ab49ec8b4f3bdada0a879c68d3817ff0963e722a82ce"},
{file = "psycopg2_binary-2.9.3-cp38-cp38-win_amd64.whl", hash = "sha256:35168209c9d51b145e459e05c31a9eaeffa9a6b0fd61689b48e07464ffd1a83e"}, {file = "psycopg2_binary-2.9.3-cp38-cp38-win_amd64.whl", hash = "sha256:35168209c9d51b145e459e05c31a9eaeffa9a6b0fd61689b48e07464ffd1a83e"},
{file = "psycopg2_binary-2.9.3-cp39-cp39-macosx_10_14_x86_64.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl", hash = "sha256:47133f3f872faf28c1e87d4357220e809dfd3fa7c64295a4a148bcd1e6e34ec9"}, {file = "psycopg2_binary-2.9.3-cp39-cp39-macosx_10_14_x86_64.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl", hash = "sha256:47133f3f872faf28c1e87d4357220e809dfd3fa7c64295a4a148bcd1e6e34ec9"},
{file = "psycopg2_binary-2.9.3-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:b3a24a1982ae56461cc24f6680604fffa2c1b818e9dc55680da038792e004d18"},
{file = "psycopg2_binary-2.9.3-cp39-cp39-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:91920527dea30175cc02a1099f331aa8c1ba39bf8b7762b7b56cbf54bc5cce42"}, {file = "psycopg2_binary-2.9.3-cp39-cp39-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:91920527dea30175cc02a1099f331aa8c1ba39bf8b7762b7b56cbf54bc5cce42"},
{file = "psycopg2_binary-2.9.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:887dd9aac71765ac0d0bac1d0d4b4f2c99d5f5c1382d8b770404f0f3d0ce8a39"}, {file = "psycopg2_binary-2.9.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:887dd9aac71765ac0d0bac1d0d4b4f2c99d5f5c1382d8b770404f0f3d0ce8a39"},
{file = "psycopg2_binary-2.9.3-cp39-cp39-manylinux_2_24_aarch64.whl", hash = "sha256:1f14c8b0942714eb3c74e1e71700cbbcb415acbc311c730370e70c578a44a25c"}, {file = "psycopg2_binary-2.9.3-cp39-cp39-manylinux_2_24_aarch64.whl", hash = "sha256:1f14c8b0942714eb3c74e1e71700cbbcb415acbc311c730370e70c578a44a25c"},
@@ -2038,18 +2041,7 @@ py = [
{file = "py-1.11.0.tar.gz", hash = "sha256:51c75c4126074b472f746a24399ad32f6053d1b34b68d2fa41e558e6f4a98719"}, {file = "py-1.11.0.tar.gz", hash = "sha256:51c75c4126074b472f746a24399ad32f6053d1b34b68d2fa41e558e6f4a98719"},
] ]
pyasn1 = [ pyasn1 = [
{file = "pyasn1-0.4.8-py2.4.egg", hash = "sha256:fec3e9d8e36808a28efb59b489e4528c10ad0f480e57dcc32b4de5c9d8c9fdf3"},
{file = "pyasn1-0.4.8-py2.5.egg", hash = "sha256:0458773cfe65b153891ac249bcf1b5f8f320b7c2ce462151f8fa74de8934becf"},
{file = "pyasn1-0.4.8-py2.6.egg", hash = "sha256:5c9414dcfede6e441f7e8f81b43b34e834731003427e5b09e4e00e3172a10f00"},
{file = "pyasn1-0.4.8-py2.7.egg", hash = "sha256:6e7545f1a61025a4e58bb336952c5061697da694db1cae97b116e9c46abcf7c8"},
{file = "pyasn1-0.4.8-py2.py3-none-any.whl", hash = "sha256:39c7e2ec30515947ff4e87fb6f456dfc6e84857d34be479c9d4a4ba4bf46aa5d"}, {file = "pyasn1-0.4.8-py2.py3-none-any.whl", hash = "sha256:39c7e2ec30515947ff4e87fb6f456dfc6e84857d34be479c9d4a4ba4bf46aa5d"},
{file = "pyasn1-0.4.8-py3.1.egg", hash = "sha256:78fa6da68ed2727915c4767bb386ab32cdba863caa7dbe473eaae45f9959da86"},
{file = "pyasn1-0.4.8-py3.2.egg", hash = "sha256:08c3c53b75eaa48d71cf8c710312316392ed40899cb34710d092e96745a358b7"},
{file = "pyasn1-0.4.8-py3.3.egg", hash = "sha256:03840c999ba71680a131cfaee6fab142e1ed9bbd9c693e285cc6aca0d555e576"},
{file = "pyasn1-0.4.8-py3.4.egg", hash = "sha256:7ab8a544af125fb704feadb008c99a88805126fb525280b2270bb25cc1d78a12"},
{file = "pyasn1-0.4.8-py3.5.egg", hash = "sha256:e89bf84b5437b532b0803ba5c9a5e054d21fec423a89952a74f87fa2c9b7bce2"},
{file = "pyasn1-0.4.8-py3.6.egg", hash = "sha256:014c0e9976956a08139dc0712ae195324a75e142284d5f87f1a87ee1b068a359"},
{file = "pyasn1-0.4.8-py3.7.egg", hash = "sha256:99fcc3c8d804d1bc6d9a099921e39d827026409a58f2a720dcdb89374ea0c776"},
{file = "pyasn1-0.4.8.tar.gz", hash = "sha256:aef77c9fb94a3ac588e87841208bdec464471d9871bd5050a287cc9a475cd0ba"}, {file = "pyasn1-0.4.8.tar.gz", hash = "sha256:aef77c9fb94a3ac588e87841208bdec464471d9871bd5050a287cc9a475cd0ba"},
] ]
pycodestyle = [ pycodestyle = [
@@ -2159,6 +2151,13 @@ pyyaml = [
{file = "PyYAML-6.0-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:f84fbc98b019fef2ee9a1cb3ce93e3187a6df0b2538a651bfb890254ba9f90b5"}, {file = "PyYAML-6.0-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:f84fbc98b019fef2ee9a1cb3ce93e3187a6df0b2538a651bfb890254ba9f90b5"},
{file = "PyYAML-6.0-cp310-cp310-win32.whl", hash = "sha256:2cd5df3de48857ed0544b34e2d40e9fac445930039f3cfe4bcc592a1f836d513"}, {file = "PyYAML-6.0-cp310-cp310-win32.whl", hash = "sha256:2cd5df3de48857ed0544b34e2d40e9fac445930039f3cfe4bcc592a1f836d513"},
{file = "PyYAML-6.0-cp310-cp310-win_amd64.whl", hash = "sha256:daf496c58a8c52083df09b80c860005194014c3698698d1a57cbcfa182142a3a"}, {file = "PyYAML-6.0-cp310-cp310-win_amd64.whl", hash = "sha256:daf496c58a8c52083df09b80c860005194014c3698698d1a57cbcfa182142a3a"},
{file = "PyYAML-6.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:d4b0ba9512519522b118090257be113b9468d804b19d63c71dbcf4a48fa32358"},
{file = "PyYAML-6.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:81957921f441d50af23654aa6c5e5eaf9b06aba7f0a19c18a538dc7ef291c5a1"},
{file = "PyYAML-6.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:afa17f5bc4d1b10afd4466fd3a44dc0e245382deca5b3c353d8b757f9e3ecb8d"},
{file = "PyYAML-6.0-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:dbad0e9d368bb989f4515da330b88a057617d16b6a8245084f1b05400f24609f"},
{file = "PyYAML-6.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:432557aa2c09802be39460360ddffd48156e30721f5e8d917f01d31694216782"},
{file = "PyYAML-6.0-cp311-cp311-win32.whl", hash = "sha256:bfaef573a63ba8923503d27530362590ff4f576c626d86a9fed95822a8255fd7"},
{file = "PyYAML-6.0-cp311-cp311-win_amd64.whl", hash = "sha256:01b45c0191e6d66c470b6cf1b9531a771a83c1c4208272ead47a3ae4f2f603bf"},
{file = "PyYAML-6.0-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:897b80890765f037df3403d22bab41627ca8811ae55e9a722fd0392850ec4d86"}, {file = "PyYAML-6.0-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:897b80890765f037df3403d22bab41627ca8811ae55e9a722fd0392850ec4d86"},
{file = "PyYAML-6.0-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:50602afada6d6cbfad699b0c7bb50d5ccffa7e46a3d738092afddc1f9758427f"}, {file = "PyYAML-6.0-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:50602afada6d6cbfad699b0c7bb50d5ccffa7e46a3d738092afddc1f9758427f"},
{file = "PyYAML-6.0-cp36-cp36m-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:48c346915c114f5fdb3ead70312bd042a953a8ce5c7106d5bfb1a5254e47da92"}, {file = "PyYAML-6.0-cp36-cp36m-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:48c346915c114f5fdb3ead70312bd042a953a8ce5c7106d5bfb1a5254e47da92"},

View File

@@ -12,7 +12,7 @@ fs2 = "0.4.3"
serde_json = "1" serde_json = "1"
tracing = "0.1.27" tracing = "0.1.27"
clap = "4.0" clap = "4.0"
daemonize = "0.4.1" nix = "0.25"
tokio = { version = "1.17", features = ["macros", "fs"] } tokio = { version = "1.17", features = ["macros", "fs"] }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }

View File

@@ -4,8 +4,7 @@
use anyhow::{bail, Context, Result}; use anyhow::{bail, Context, Result};
use clap::{value_parser, Arg, ArgAction, Command}; use clap::{value_parser, Arg, ArgAction, Command};
use const_format::formatcp; use const_format::formatcp;
use daemonize::Daemonize; use nix::unistd::Pid;
use fs2::FileExt;
use remote_storage::RemoteStorageConfig; use remote_storage::RemoteStorageConfig;
use std::fs::{self, File}; use std::fs::{self, File};
use std::io::{ErrorKind, Write}; use std::io::{ErrorKind, Write};
@@ -16,6 +15,7 @@ use tokio::sync::mpsc;
use toml_edit::Document; use toml_edit::Document;
use tracing::*; use tracing::*;
use url::{ParseError, Url}; use url::{ParseError, Url};
use utils::lock_file;
use metrics::set_build_info_metric; use metrics::set_build_info_metric;
use safekeeper::broker; use safekeeper::broker;
@@ -35,12 +35,10 @@ use utils::{
http::endpoint, http::endpoint,
id::NodeId, id::NodeId,
logging::{self, LogFormat}, logging::{self, LogFormat},
project_git_version, project_git_version, signals, tcp_listener,
shutdown::exit_now,
signals, tcp_listener,
}; };
const LOCK_FILE_NAME: &str = "safekeeper.lock"; const PID_FILE_NAME: &str = "safekeeper.pid";
const ID_FILE_NAME: &str = "safekeeper.id"; const ID_FILE_NAME: &str = "safekeeper.id";
project_git_version!(GIT_VERSION); project_git_version!(GIT_VERSION);
@@ -65,10 +63,6 @@ fn main() -> anyhow::Result<()> {
conf.no_sync = true; conf.no_sync = true;
} }
if arg_matches.get_flag("daemonize") {
conf.daemonize = true;
}
if let Some(addr) = arg_matches.get_one::<String>("listen-pg") { if let Some(addr) = arg_matches.get_one::<String>("listen-pg") {
conf.listen_pg_addr = addr.to_string(); conf.listen_pg_addr = addr.to_string();
} }
@@ -143,19 +137,33 @@ fn main() -> anyhow::Result<()> {
} }
fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bool) -> Result<()> { fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bool) -> Result<()> {
let log_file = logging::init("safekeeper.log", conf.daemonize, conf.log_format)?; logging::init(conf.log_format)?;
info!("version: {GIT_VERSION}"); info!("version: {GIT_VERSION}");
// Prevent running multiple safekeepers on the same directory // Prevent running multiple safekeepers on the same directory
let lock_file_path = conf.workdir.join(LOCK_FILE_NAME); let lock_file_path = conf.workdir.join(PID_FILE_NAME);
let lock_file = File::create(&lock_file_path).context("failed to open lockfile")?; let lock_file = match lock_file::create_lock_file(&lock_file_path, Pid::this().to_string()) {
lock_file.try_lock_exclusive().with_context(|| { lock_file::LockCreationResult::Created {
format!( new_lock_contents,
"control file {} is locked by some other process", file,
lock_file_path.display() } => {
) info!("Created lock file at {lock_file_path:?} with contenst {new_lock_contents}");
})?; file
}
lock_file::LockCreationResult::AlreadyLocked {
existing_lock_contents,
} => anyhow::bail!(
"Could not lock pid file; safekeeper is already running in {:?} with PID {}",
conf.workdir,
existing_lock_contents
),
lock_file::LockCreationResult::CreationFailed(e) => {
return Err(e.context(format!("Failed to create lock file at {lock_file_path:?}")))
}
};
// ensure that the lock file is held even if the main thread of the process is panics
// we need to release the lock file only when the current process is gone
let _ = Box::leak(Box::new(lock_file));
// Set or read our ID. // Set or read our ID.
set_id(&mut conf, given_id)?; set_id(&mut conf, given_id)?;
@@ -187,31 +195,6 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
} }
}; };
// XXX: Don't spawn any threads before daemonizing!
if conf.daemonize {
info!("daemonizing...");
// There should'n be any logging to stdin/stdout. Redirect it to the main log so
// that we will see any accidental manual fprintf's or backtraces.
let stdout = log_file.try_clone().unwrap();
let stderr = log_file;
let daemonize = Daemonize::new()
.pid_file("safekeeper.pid")
.working_directory(Path::new("."))
.stdout(stdout)
.stderr(stderr);
// XXX: The parent process should exit abruptly right after
// it has spawned a child to prevent coverage machinery from
// dumping stats into a `profraw` file now owned by the child.
// Otherwise, the coverage data will be damaged.
match daemonize.exit_action(|| exit_now(0)).start() {
Ok(_) => info!("Success, daemonized"),
Err(err) => bail!("Error: {err}. could not daemonize. bailing."),
}
}
// Register metrics collector for active timelines. It's important to do this // Register metrics collector for active timelines. It's important to do this
// after daemonizing, otherwise process collector will be upset. // after daemonizing, otherwise process collector will be upset.
let timeline_collector = safekeeper::metrics::TimelineCollector::new(); let timeline_collector = safekeeper::metrics::TimelineCollector::new();
@@ -384,13 +367,6 @@ fn cli() -> Command {
.short('p') .short('p')
.long("pageserver"), .long("pageserver"),
) )
.arg(
Arg::new("daemonize")
.short('d')
.long("daemonize")
.action(ArgAction::SetTrue)
.help("Run in the background"),
)
.arg( .arg(
Arg::new("no-sync") Arg::new("no-sync")
.short('n') .short('n')

View File

@@ -54,7 +54,6 @@ pub struct SafeKeeperConf {
// data directories to avoid clashing with each other. // data directories to avoid clashing with each other.
pub workdir: PathBuf, pub workdir: PathBuf,
pub daemonize: bool,
pub no_sync: bool, pub no_sync: bool,
pub listen_pg_addr: String, pub listen_pg_addr: String,
pub listen_http_addr: String, pub listen_http_addr: String,
@@ -88,7 +87,6 @@ impl Default for SafeKeeperConf {
// command line, so that when the server is running, all paths are relative // command line, so that when the server is running, all paths are relative
// to that. // to that.
workdir: PathBuf::from("./"), workdir: PathBuf::from("./"),
daemonize: false,
no_sync: false, no_sync: false,
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(), listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(), listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),

View File

@@ -19,7 +19,7 @@ from dataclasses import dataclass, field
from enum import Flag, auto from enum import Flag, auto
from functools import cached_property from functools import cached_property
from pathlib import Path from pathlib import Path
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, TypeVar, Union, cast from typing import Any, Dict, Iterator, List, Optional, Tuple, Union, cast
import asyncpg import asyncpg
import backoff # type: ignore import backoff # type: ignore
@@ -36,7 +36,7 @@ from psycopg2.extensions import connection as PgConnection
from psycopg2.extensions import make_dsn, parse_dsn from psycopg2.extensions import make_dsn, parse_dsn
from typing_extensions import Literal from typing_extensions import Literal
from .utils import allure_attach_from_dir, etcd_path, get_self_dir, subprocess_capture from .utils import Fn, allure_attach_from_dir, etcd_path, get_self_dir, subprocess_capture
""" """
This file contains pytest fixtures. A fixture is a test resource that can be This file contains pytest fixtures. A fixture is a test resource that can be
@@ -56,7 +56,6 @@ put directly-importable functions into utils.py or another separate file.
""" """
Env = Dict[str, str] Env = Dict[str, str]
Fn = TypeVar("Fn", bound=Callable[..., Any])
DEFAULT_OUTPUT_DIR = "test_output" DEFAULT_OUTPUT_DIR = "test_output"
DEFAULT_BRANCH_NAME = "main" DEFAULT_BRANCH_NAME = "main"
@@ -965,11 +964,11 @@ def neon_env_builder(
yield builder yield builder
class NeonPageserverApiException(Exception): class PageserverApiException(Exception):
pass pass
class NeonPageserverHttpClient(requests.Session): class PageserverHttpClient(requests.Session):
def __init__(self, port: int, is_testing_enabled_or_skip: Fn, auth_token: Optional[str] = None): def __init__(self, port: int, is_testing_enabled_or_skip: Fn, auth_token: Optional[str] = None):
super().__init__() super().__init__()
self.port = port self.port = port
@@ -987,7 +986,7 @@ class NeonPageserverHttpClient(requests.Session):
msg = res.json()["msg"] msg = res.json()["msg"]
except: # noqa: E722 except: # noqa: E722
msg = "" msg = ""
raise NeonPageserverApiException(msg) from e raise PageserverApiException(msg) from e
def check_status(self): def check_status(self):
self.get(f"http://localhost:{self.port}/v1/status").raise_for_status() self.get(f"http://localhost:{self.port}/v1/status").raise_for_status()
@@ -1624,8 +1623,6 @@ class ComputeCtl(AbstractNeonCli):
class NeonPageserver(PgProtocol): class NeonPageserver(PgProtocol):
""" """
An object representing a running pageserver. An object representing a running pageserver.
Initializes the repository via `neon init`.
""" """
TEMP_FILE_SUFFIX = "___temp" TEMP_FILE_SUFFIX = "___temp"
@@ -1674,8 +1671,8 @@ class NeonPageserver(PgProtocol):
if '"profiling"' not in self.version: if '"profiling"' not in self.version:
pytest.skip("pageserver was built without 'profiling' feature") pytest.skip("pageserver was built without 'profiling' feature")
def http_client(self, auth_token: Optional[str] = None) -> NeonPageserverHttpClient: def http_client(self, auth_token: Optional[str] = None) -> PageserverHttpClient:
return NeonPageserverHttpClient( return PageserverHttpClient(
port=self.service_port.http, port=self.service_port.http,
auth_token=auth_token, auth_token=auth_token,
is_testing_enabled_or_skip=self.is_testing_enabled_or_skip, is_testing_enabled_or_skip=self.is_testing_enabled_or_skip,
@@ -2260,11 +2257,6 @@ class PostgresFactory:
return self return self
def read_pid(path: Path) -> int:
"""Read content of file into number"""
return int(path.read_text())
@dataclass @dataclass
class SafekeeperPort: class SafekeeperPort:
pg: int pg: int
@@ -2688,26 +2680,8 @@ def check_restored_datadir_content(
assert (mismatch, error) == ([], []) assert (mismatch, error) == ([], [])
def wait_until(number_of_iterations: int, interval: float, func):
"""
Wait until 'func' returns successfully, without exception. Returns the
last return value from the function.
"""
last_exception = None
for i in range(number_of_iterations):
try:
res = func()
except Exception as e:
log.info("waiting for %s iteration %s failed", func, i + 1)
last_exception = e
time.sleep(interval)
continue
return res
raise Exception("timed out while waiting for %s" % func) from last_exception
def assert_no_in_progress_downloads_for_tenant( def assert_no_in_progress_downloads_for_tenant(
pageserver_http_client: NeonPageserverHttpClient, pageserver_http_client: PageserverHttpClient,
tenant: TenantId, tenant: TenantId,
): ):
tenant_status = pageserver_http_client.tenant_status(tenant) tenant_status = pageserver_http_client.tenant_status(tenant)
@@ -2715,7 +2689,7 @@ def assert_no_in_progress_downloads_for_tenant(
def remote_consistent_lsn( def remote_consistent_lsn(
pageserver_http_client: NeonPageserverHttpClient, tenant: TenantId, timeline: TimelineId pageserver_http_client: PageserverHttpClient, tenant: TenantId, timeline: TimelineId
) -> Lsn: ) -> Lsn:
detail = pageserver_http_client.timeline_detail(tenant, timeline) detail = pageserver_http_client.timeline_detail(tenant, timeline)
@@ -2730,7 +2704,7 @@ def remote_consistent_lsn(
def wait_for_upload( def wait_for_upload(
pageserver_http_client: NeonPageserverHttpClient, pageserver_http_client: PageserverHttpClient,
tenant: TenantId, tenant: TenantId,
timeline: TimelineId, timeline: TimelineId,
lsn: Lsn, lsn: Lsn,
@@ -2754,7 +2728,7 @@ def wait_for_upload(
def last_record_lsn( def last_record_lsn(
pageserver_http_client: NeonPageserverHttpClient, tenant: TenantId, timeline: TimelineId pageserver_http_client: PageserverHttpClient, tenant: TenantId, timeline: TimelineId
) -> Lsn: ) -> Lsn:
detail = pageserver_http_client.timeline_detail(tenant, timeline) detail = pageserver_http_client.timeline_detail(tenant, timeline)
@@ -2764,7 +2738,7 @@ def last_record_lsn(
def wait_for_last_record_lsn( def wait_for_last_record_lsn(
pageserver_http_client: NeonPageserverHttpClient, pageserver_http_client: PageserverHttpClient,
tenant: TenantId, tenant: TenantId,
timeline: TimelineId, timeline: TimelineId,
lsn: Lsn, lsn: Lsn,

View File

@@ -4,13 +4,16 @@ import re
import shutil import shutil
import subprocess import subprocess
import tarfile import tarfile
import time
from pathlib import Path from pathlib import Path
from typing import Any, List, Tuple from typing import Any, Callable, List, Tuple, TypeVar
import allure # type: ignore import allure # type: ignore
from fixtures.log_helper import log from fixtures.log_helper import log
from psycopg2.extensions import cursor from psycopg2.extensions import cursor
Fn = TypeVar("Fn", bound=Callable[..., Any])
def get_self_dir() -> str: def get_self_dir() -> str:
"""Get the path to the directory where this script lives.""" """Get the path to the directory where this script lives."""
@@ -188,3 +191,57 @@ def allure_attach_from_dir(dir: Path):
extension = attachment.suffix.removeprefix(".") extension = attachment.suffix.removeprefix(".")
allure.attach.file(source, name, attachment_type, extension) allure.attach.file(source, name, attachment_type, extension)
def start_in_background(
command: list[str], cwd: Path, log_file_name: str, is_started: Fn
) -> subprocess.Popen[bytes]:
"""Starts a process, creates the logfile and redirects stderr and stdout there. Runs the start checks before the process is started, or errors."""
log.info(f'Running command "{" ".join(command)}"')
with open(cwd / log_file_name, "wb") as log_file:
spawned_process = subprocess.Popen(command, stdout=log_file, stderr=log_file, cwd=cwd)
error = None
try:
return_code = spawned_process.poll()
if return_code is not None:
error = f"expected subprocess to run but it exited with code {return_code}"
else:
attempts = 10
try:
wait_until(
number_of_iterations=attempts,
interval=1,
func=is_started,
)
except Exception:
error = f"Failed to get correct status from subprocess in {attempts} attempts"
except Exception as e:
error = f"expected subprocess to start but it failed with exception: {e}"
if error is not None:
log.error(error)
spawned_process.kill()
raise Exception(f"Failed to run subprocess as {command}, reason: {error}")
log.info("subprocess spawned")
return spawned_process
def wait_until(number_of_iterations: int, interval: float, func: Fn):
"""
Wait until 'func' returns successfully, without exception. Returns the
last return value from the function.
"""
last_exception = None
for i in range(number_of_iterations):
try:
res = func()
except Exception as e:
log.info("waiting for %s iteration %s failed", func, i + 1)
last_exception = e
time.sleep(interval)
continue
return res
raise Exception("timed out while waiting for %s" % func) from last_exception

View File

@@ -1,7 +1,7 @@
from contextlib import closing from contextlib import closing
import pytest import pytest
from fixtures.neon_fixtures import NeonEnvBuilder, NeonPageserverApiException from fixtures.neon_fixtures import NeonEnvBuilder, PageserverApiException
from fixtures.types import TenantId from fixtures.types import TenantId
@@ -39,7 +39,7 @@ def test_pageserver_auth(neon_env_builder: NeonEnvBuilder):
# fail to create branch using token with different tenant_id # fail to create branch using token with different tenant_id
with pytest.raises( with pytest.raises(
NeonPageserverApiException, match="Forbidden: Tenant id mismatch. Permission denied" PageserverApiException, match="Forbidden: Tenant id mismatch. Permission denied"
): ):
invalid_tenant_http_client.timeline_create( invalid_tenant_http_client.timeline_create(
tenant_id=env.initial_tenant, ancestor_timeline_id=new_timeline_id tenant_id=env.initial_tenant, ancestor_timeline_id=new_timeline_id
@@ -50,7 +50,7 @@ def test_pageserver_auth(neon_env_builder: NeonEnvBuilder):
# fail to create tenant using tenant token # fail to create tenant using tenant token
with pytest.raises( with pytest.raises(
NeonPageserverApiException, PageserverApiException,
match="Forbidden: Attempt to access management api with tenant scope. Permission denied", match="Forbidden: Attempt to access management api with tenant scope. Permission denied",
): ):
tenant_http_client.tenant_create() tenant_http_client.tenant_create()

View File

@@ -10,7 +10,7 @@ import toml
from fixtures.neon_fixtures import ( from fixtures.neon_fixtures import (
NeonCli, NeonCli,
NeonEnvBuilder, NeonEnvBuilder,
NeonPageserverHttpClient, PageserverHttpClient,
PgBin, PgBin,
PortDistributor, PortDistributor,
wait_for_last_record_lsn, wait_for_last_record_lsn,
@@ -208,7 +208,7 @@ def test_backward_compatibility(
timeline_id = dict(snapshot_config["branch_name_mappings"]["main"])[tenant_id] timeline_id = dict(snapshot_config["branch_name_mappings"]["main"])[tenant_id]
pageserver_port = snapshot_config["pageserver"]["listen_http_addr"].split(":")[-1] pageserver_port = snapshot_config["pageserver"]["listen_http_addr"].split(":")[-1]
auth_token = snapshot_config["pageserver"]["auth_token"] auth_token = snapshot_config["pageserver"]["auth_token"]
pageserver_http = NeonPageserverHttpClient( pageserver_http = PageserverHttpClient(
port=pageserver_port, port=pageserver_port,
is_testing_enabled_or_skip=lambda: True, # TODO: check if testing really enabled is_testing_enabled_or_skip=lambda: True, # TODO: check if testing really enabled
auth_token=auth_token, auth_token=auth_token,

View File

@@ -5,13 +5,13 @@ from fixtures.neon_fixtures import (
DEFAULT_BRANCH_NAME, DEFAULT_BRANCH_NAME,
NeonEnv, NeonEnv,
NeonEnvBuilder, NeonEnvBuilder,
NeonPageserverHttpClient, PageserverHttpClient,
) )
from fixtures.types import TenantId, TimelineId from fixtures.types import TenantId, TimelineId
def helper_compare_timeline_list( def helper_compare_timeline_list(
pageserver_http_client: NeonPageserverHttpClient, env: NeonEnv, initial_tenant: TenantId pageserver_http_client: PageserverHttpClient, env: NeonEnv, initial_tenant: TenantId
): ):
""" """
Compare timelines list returned by CLI and directly via API. Compare timelines list returned by CLI and directly via API.
@@ -56,7 +56,7 @@ def test_cli_timeline_list(neon_simple_env: NeonEnv):
assert nested_timeline_id in timelines_cli assert nested_timeline_id in timelines_cli
def helper_compare_tenant_list(pageserver_http_client: NeonPageserverHttpClient, env: NeonEnv): def helper_compare_tenant_list(pageserver_http_client: PageserverHttpClient, env: NeonEnv):
tenants = pageserver_http_client.tenant_list() tenants = pageserver_http_client.tenant_list()
tenants_api = sorted(map(lambda t: cast(str, t["id"]), tenants)) tenants_api = sorted(map(lambda t: cast(str, t["id"]), tenants))

View File

@@ -1,9 +1,9 @@
import pytest import pytest
from fixtures.log_helper import log from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, NeonPageserverHttpClient from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, PageserverHttpClient
def check_tenant(env: NeonEnv, pageserver_http: NeonPageserverHttpClient): def check_tenant(env: NeonEnv, pageserver_http: PageserverHttpClient):
tenant_id, timeline_id = env.neon_cli.create_tenant() tenant_id, timeline_id = env.neon_cli.create_tenant()
pg = env.postgres.create_start("main", tenant_id=tenant_id) pg = env.postgres.create_start("main", tenant_id=tenant_id)
# we rely upon autocommit after each statement # we rely upon autocommit after each statement

View File

@@ -6,12 +6,12 @@ from fixtures.neon_fixtures import (
DEFAULT_BRANCH_NAME, DEFAULT_BRANCH_NAME,
NeonEnv, NeonEnv,
NeonEnvBuilder, NeonEnvBuilder,
NeonPageserverHttpClient, PageserverHttpClient,
neon_binpath, neon_binpath,
pg_distrib_dir, pg_distrib_dir,
wait_until,
) )
from fixtures.types import Lsn, TenantId, TimelineId from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import wait_until
# test that we cannot override node id after init # test that we cannot override node id after init
@@ -29,8 +29,9 @@ def test_pageserver_init_node_id(neon_simple_env: NeonEnv):
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
) )
# remove initial config # remove initial config and stop existing pageserver
pageserver_config.unlink() pageserver_config.unlink()
neon_simple_env.pageserver.stop()
bad_init = run_pageserver(["--init", "-c", f'pg_distrib_dir="{pg_distrib_dir}"']) bad_init = run_pageserver(["--init", "-c", f'pg_distrib_dir="{pg_distrib_dir}"'])
assert ( assert (
@@ -60,7 +61,7 @@ def test_pageserver_init_node_id(neon_simple_env: NeonEnv):
assert "has node id already, it cannot be overridden" in bad_update.stderr assert "has node id already, it cannot be overridden" in bad_update.stderr
def check_client(client: NeonPageserverHttpClient, initial_tenant: TenantId): def check_client(client: PageserverHttpClient, initial_tenant: TenantId):
client.check_status() client.check_status()
# check initial tenant is there # check initial tenant is there
@@ -116,7 +117,7 @@ def test_pageserver_http_get_wal_receiver_not_found(neon_simple_env: NeonEnv):
def expect_updated_msg_lsn( def expect_updated_msg_lsn(
client: NeonPageserverHttpClient, client: PageserverHttpClient,
tenant_id: TenantId, tenant_id: TenantId,
timeline_id: TimelineId, timeline_id: TimelineId,
prev_msg_lsn: Optional[Lsn], prev_msg_lsn: Optional[Lsn],

View File

@@ -15,10 +15,9 @@ from fixtures.neon_fixtures import (
available_remote_storages, available_remote_storages,
wait_for_last_record_lsn, wait_for_last_record_lsn,
wait_for_upload, wait_for_upload,
wait_until,
) )
from fixtures.types import Lsn, TenantId, TimelineId from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import query_scalar from fixtures.utils import query_scalar, wait_until
# #

View File

@@ -2,16 +2,12 @@ from threading import Thread
import pytest import pytest
from fixtures.log_helper import log from fixtures.log_helper import log
from fixtures.neon_fixtures import ( from fixtures.neon_fixtures import NeonEnvBuilder, PageserverApiException, PageserverHttpClient
NeonEnvBuilder,
NeonPageserverApiException,
NeonPageserverHttpClient,
)
from fixtures.types import TenantId, TimelineId from fixtures.types import TenantId, TimelineId
def do_gc_target( def do_gc_target(
pageserver_http: NeonPageserverHttpClient, tenant_id: TenantId, timeline_id: TimelineId pageserver_http: PageserverHttpClient, tenant_id: TenantId, timeline_id: TimelineId
): ):
"""Hack to unblock main, see https://github.com/neondatabase/neon/issues/2211""" """Hack to unblock main, see https://github.com/neondatabase/neon/issues/2211"""
try: try:
@@ -27,7 +23,7 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder):
# first check for non existing tenant # first check for non existing tenant
tenant_id = TenantId.generate() tenant_id = TenantId.generate()
with pytest.raises( with pytest.raises(
expected_exception=NeonPageserverApiException, expected_exception=PageserverApiException,
match=f"Tenant not found for id {tenant_id}", match=f"Tenant not found for id {tenant_id}",
): ):
pageserver_http.tenant_detach(tenant_id) pageserver_http.tenant_detach(tenant_id)
@@ -49,7 +45,7 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder):
# gc should not try to even start # gc should not try to even start
with pytest.raises( with pytest.raises(
expected_exception=NeonPageserverApiException, match="gc target timeline does not exist" expected_exception=PageserverApiException, match="gc target timeline does not exist"
): ):
bogus_timeline_id = TimelineId.generate() bogus_timeline_id = TimelineId.generate()
pageserver_http.timeline_gc(tenant_id, bogus_timeline_id, 0) pageserver_http.timeline_gc(tenant_id, bogus_timeline_id, 0)
@@ -78,6 +74,6 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder):
assert not (env.repo_dir / "tenants" / str(tenant_id)).exists() assert not (env.repo_dir / "tenants" / str(tenant_id)).exists()
with pytest.raises( with pytest.raises(
expected_exception=NeonPageserverApiException, match=f"Tenant {tenant_id} not found" expected_exception=PageserverApiException, match=f"Tenant {tenant_id} not found"
): ):
pageserver_http.timeline_gc(tenant_id, timeline_id, 0) pageserver_http.timeline_gc(tenant_id, timeline_id, 0)

View File

@@ -1,7 +1,5 @@
import os import os
import pathlib import pathlib
import signal
import subprocess
import threading import threading
from contextlib import closing, contextmanager from contextlib import closing, contextmanager
from typing import Any, Dict, Optional, Tuple from typing import Any, Dict, Optional, Tuple
@@ -12,7 +10,7 @@ from fixtures.neon_fixtures import (
Etcd, Etcd,
NeonEnv, NeonEnv,
NeonEnvBuilder, NeonEnvBuilder,
NeonPageserverHttpClient, PageserverHttpClient,
PortDistributor, PortDistributor,
Postgres, Postgres,
assert_no_in_progress_downloads_for_tenant, assert_no_in_progress_downloads_for_tenant,
@@ -21,10 +19,9 @@ from fixtures.neon_fixtures import (
pg_distrib_dir, pg_distrib_dir,
wait_for_last_record_lsn, wait_for_last_record_lsn,
wait_for_upload, wait_for_upload,
wait_until,
) )
from fixtures.types import Lsn, TenantId, TimelineId from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import query_scalar, subprocess_capture from fixtures.utils import query_scalar, start_in_background, subprocess_capture, wait_until
def assert_abs_margin_ratio(a: float, b: float, margin_ratio: float): def assert_abs_margin_ratio(a: float, b: float, margin_ratio: float):
@@ -32,7 +29,7 @@ def assert_abs_margin_ratio(a: float, b: float, margin_ratio: float):
@contextmanager @contextmanager
def new_pageserver_helper( def new_pageserver_service(
new_pageserver_dir: pathlib.Path, new_pageserver_dir: pathlib.Path,
pageserver_bin: pathlib.Path, pageserver_bin: pathlib.Path,
remote_storage_mock_path: pathlib.Path, remote_storage_mock_path: pathlib.Path,
@@ -49,7 +46,6 @@ def new_pageserver_helper(
str(pageserver_bin), str(pageserver_bin),
"--workdir", "--workdir",
str(new_pageserver_dir), str(new_pageserver_dir),
"--daemonize",
"--update-config", "--update-config",
f"-c listen_pg_addr='localhost:{pg_port}'", f"-c listen_pg_addr='localhost:{pg_port}'",
f"-c listen_http_addr='localhost:{http_port}'", f"-c listen_http_addr='localhost:{http_port}'",
@@ -61,16 +57,26 @@ def new_pageserver_helper(
cmd.append( cmd.append(
f"-c broker_endpoints=['{broker.client_url()}']", f"-c broker_endpoints=['{broker.client_url()}']",
) )
pageserver_client = PageserverHttpClient(
log.info("starting new pageserver %s", cmd) port=http_port,
out = subprocess.check_output(cmd, text=True) auth_token=None,
log.info("started new pageserver %s", out) is_testing_enabled_or_skip=lambda: True, # TODO: check if testing really enabled
)
try: try:
yield pageserver_process = start_in_background(
cmd, new_pageserver_dir, "pageserver.log", pageserver_client.check_status
)
except Exception as e:
log.error(e)
pageserver_process.kill()
raise Exception(f"Failed to start pageserver as {cmd}, reason: {e}")
log.info("new pageserver started")
try:
yield pageserver_process
finally: finally:
log.info("stopping new pageserver") log.info("stopping new pageserver")
pid = int((new_pageserver_dir / "pageserver.pid").read_text()) pageserver_process.kill()
os.kill(pid, signal.SIGQUIT)
@contextmanager @contextmanager
@@ -113,7 +119,7 @@ def load(pg: Postgres, stop_event: threading.Event, load_ok_event: threading.Eve
def populate_branch( def populate_branch(
pg: Postgres, pg: Postgres,
tenant_id: TenantId, tenant_id: TenantId,
ps_http: NeonPageserverHttpClient, ps_http: PageserverHttpClient,
create_table: bool, create_table: bool,
expected_sum: Optional[int], expected_sum: Optional[int],
) -> Tuple[TimelineId, Lsn]: ) -> Tuple[TimelineId, Lsn]:
@@ -146,7 +152,7 @@ def populate_branch(
def ensure_checkpoint( def ensure_checkpoint(
pageserver_http: NeonPageserverHttpClient, pageserver_http: PageserverHttpClient,
tenant_id: TenantId, tenant_id: TenantId,
timeline_id: TimelineId, timeline_id: TimelineId,
current_lsn: Lsn, current_lsn: Lsn,
@@ -159,7 +165,7 @@ def ensure_checkpoint(
def check_timeline_attached( def check_timeline_attached(
new_pageserver_http_client: NeonPageserverHttpClient, new_pageserver_http_client: PageserverHttpClient,
tenant_id: TenantId, tenant_id: TenantId,
timeline_id: TimelineId, timeline_id: TimelineId,
old_timeline_detail: Dict[str, Any], old_timeline_detail: Dict[str, Any],
@@ -346,13 +352,13 @@ def test_tenant_relocation(
log.info("new pageserver ports pg %s http %s", new_pageserver_pg_port, new_pageserver_http_port) log.info("new pageserver ports pg %s http %s", new_pageserver_pg_port, new_pageserver_http_port)
pageserver_bin = pathlib.Path(neon_binpath) / "pageserver" pageserver_bin = pathlib.Path(neon_binpath) / "pageserver"
new_pageserver_http = NeonPageserverHttpClient( new_pageserver_http = PageserverHttpClient(
port=new_pageserver_http_port, port=new_pageserver_http_port,
auth_token=None, auth_token=None,
is_testing_enabled_or_skip=env.pageserver.is_testing_enabled_or_skip, is_testing_enabled_or_skip=env.pageserver.is_testing_enabled_or_skip,
) )
with new_pageserver_helper( with new_pageserver_service(
new_pageserver_dir, new_pageserver_dir,
pageserver_bin, pageserver_bin,
remote_storage_mock_path, remote_storage_mock_path,

View File

@@ -1,6 +1,7 @@
from fixtures.log_helper import log from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, wait_until from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.types import TenantId, TimelineId from fixtures.types import TenantId, TimelineId
from fixtures.utils import wait_until
def get_only_element(l): # noqa: E741 def get_only_element(l): # noqa: E741

View File

@@ -25,10 +25,9 @@ from fixtures.neon_fixtures import (
available_remote_storages, available_remote_storages,
wait_for_last_record_lsn, wait_for_last_record_lsn,
wait_for_upload, wait_for_upload,
wait_until,
) )
from fixtures.types import Lsn, TenantId, TimelineId from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import query_scalar from fixtures.utils import query_scalar, wait_until
async def tenant_workload(env: NeonEnv, pg: Postgres): async def tenant_workload(env: NeonEnv, pg: Postgres):

View File

@@ -1,6 +1,7 @@
import pytest import pytest
from fixtures.neon_fixtures import NeonEnv, NeonPageserverApiException, wait_until from fixtures.neon_fixtures import NeonEnv, PageserverApiException
from fixtures.types import TenantId, TimelineId from fixtures.types import TenantId, TimelineId
from fixtures.utils import wait_until
def test_timeline_delete(neon_simple_env: NeonEnv): def test_timeline_delete(neon_simple_env: NeonEnv):
@@ -11,13 +12,13 @@ def test_timeline_delete(neon_simple_env: NeonEnv):
# first try to delete non existing timeline # first try to delete non existing timeline
# for existing tenant: # for existing tenant:
invalid_timeline_id = TimelineId.generate() invalid_timeline_id = TimelineId.generate()
with pytest.raises(NeonPageserverApiException, match="timeline not found"): with pytest.raises(PageserverApiException, match="timeline not found"):
ps_http.timeline_delete(tenant_id=env.initial_tenant, timeline_id=invalid_timeline_id) ps_http.timeline_delete(tenant_id=env.initial_tenant, timeline_id=invalid_timeline_id)
# for non existing tenant: # for non existing tenant:
invalid_tenant_id = TenantId.generate() invalid_tenant_id = TenantId.generate()
with pytest.raises( with pytest.raises(
NeonPageserverApiException, PageserverApiException,
match=f"Tenant {invalid_tenant_id} not found in the local state", match=f"Tenant {invalid_tenant_id} not found in the local state",
): ):
ps_http.timeline_delete(tenant_id=invalid_tenant_id, timeline_id=invalid_timeline_id) ps_http.timeline_delete(tenant_id=invalid_tenant_id, timeline_id=invalid_timeline_id)
@@ -32,7 +33,7 @@ def test_timeline_delete(neon_simple_env: NeonEnv):
ps_http = env.pageserver.http_client() ps_http = env.pageserver.http_client()
with pytest.raises( with pytest.raises(
NeonPageserverApiException, match="Cannot delete timeline which has child timelines" PageserverApiException, match="Cannot delete timeline which has child timelines"
): ):
timeline_path = ( timeline_path = (
@@ -64,7 +65,7 @@ def test_timeline_delete(neon_simple_env: NeonEnv):
# check 404 # check 404
with pytest.raises( with pytest.raises(
NeonPageserverApiException, PageserverApiException,
match=f"Timeline {env.initial_tenant}/{leaf_timeline_id} was not found", match=f"Timeline {env.initial_tenant}/{leaf_timeline_id} was not found",
): ):
ps_http.timeline_detail(env.initial_tenant, leaf_timeline_id) ps_http.timeline_detail(env.initial_tenant, leaf_timeline_id)

View File

@@ -11,7 +11,7 @@ from fixtures.log_helper import log
from fixtures.neon_fixtures import ( from fixtures.neon_fixtures import (
NeonEnv, NeonEnv,
NeonEnvBuilder, NeonEnvBuilder,
NeonPageserverHttpClient, PageserverHttpClient,
PgBin, PgBin,
PortDistributor, PortDistributor,
Postgres, Postgres,
@@ -462,7 +462,7 @@ def assert_physical_size(env: NeonEnv, tenant_id: TenantId, timeline_id: Timelin
# Timeline logical size initialization is an asynchronous background task that runs once, # Timeline logical size initialization is an asynchronous background task that runs once,
# try a few times to ensure it's activated properly # try a few times to ensure it's activated properly
def wait_for_timeline_size_init( def wait_for_timeline_size_init(
client: NeonPageserverHttpClient, tenant: TenantId, timeline: TimelineId client: PageserverHttpClient, tenant: TenantId, timeline: TimelineId
): ):
for i in range(10): for i in range(10):
timeline_details = client.timeline_detail( timeline_details = client.timeline_detail(

View File

@@ -27,6 +27,7 @@ from fixtures.neon_fixtures import (
RemoteStorageKind, RemoteStorageKind,
RemoteStorageUsers, RemoteStorageUsers,
Safekeeper, Safekeeper,
SafekeeperHttpClient,
SafekeeperPort, SafekeeperPort,
available_remote_storages, available_remote_storages,
neon_binpath, neon_binpath,
@@ -34,7 +35,7 @@ from fixtures.neon_fixtures import (
wait_for_upload, wait_for_upload,
) )
from fixtures.types import Lsn, TenantId, TimelineId from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import get_dir_size, query_scalar from fixtures.utils import get_dir_size, query_scalar, start_in_background
def wait_lsn_force_checkpoint( def wait_lsn_force_checkpoint(
@@ -841,7 +842,7 @@ class SafekeeperEnv:
safekeeper_dir = self.repo_dir / f"sk{i}" safekeeper_dir = self.repo_dir / f"sk{i}"
safekeeper_dir.mkdir(exist_ok=True) safekeeper_dir.mkdir(exist_ok=True)
args = [ cmd = [
self.bin_safekeeper, self.bin_safekeeper,
"-l", "-l",
f"127.0.0.1:{port.pg}", f"127.0.0.1:{port.pg}",
@@ -853,11 +854,22 @@ class SafekeeperEnv:
str(i), str(i),
"--broker-endpoints", "--broker-endpoints",
self.broker.client_url(), self.broker.client_url(),
"--daemonize",
] ]
log.info(f'Running command "{" ".join(cmd)}"')
log.info(f'Running command "{" ".join(args)}"') safekeeper_client = SafekeeperHttpClient(
return subprocess.run(args, check=True) port=port.http,
auth_token=None,
)
try:
safekeeper_process = start_in_background(
cmd, safekeeper_dir, "safekeeper.log", safekeeper_client.check_status
)
return safekeeper_process
except Exception as e:
log.error(e)
safekeeper_process.kill()
raise Exception(f"Failed to start safekepeer as {cmd}, reason: {e}")
def get_safekeeper_connstrs(self): def get_safekeeper_connstrs(self):
return ",".join([sk_proc.args[2] for sk_proc in self.safekeepers]) return ",".join([sk_proc.args[2] for sk_proc in self.safekeepers])