Compare commits

...

1 Commits

Author SHA1 Message Date
Alexey Kondratov
6cfaa775b0 chore(compute): Minor compute_ctl startup refactoring 2025-02-26 21:55:25 +01:00
2 changed files with 151 additions and 133 deletions

View File

@@ -53,7 +53,6 @@ use compute_tools::compute::{
ComputeNode, ComputeState, PG_PID, ParsedSpec, forward_termination_signal,
};
use compute_tools::configurator::launch_configurator;
use compute_tools::disk_quota::set_disk_quota;
use compute_tools::extension_server::get_pg_version_string;
use compute_tools::http::server::Server;
use compute_tools::logger::*;
@@ -61,11 +60,10 @@ use compute_tools::lsn_lease::launch_lsn_lease_bg_task_for_static;
use compute_tools::monitor::launch_monitor;
use compute_tools::params::*;
use compute_tools::spec::*;
use compute_tools::swap::resize_swap;
use rlimit::{Resource, setrlimit};
use signal_hook::consts::{SIGINT, SIGQUIT, SIGTERM};
use signal_hook::iterator::Signals;
use tracing::{error, info, warn};
use tracing::{error, info};
use url::Url;
use utils::failpoint_support;
@@ -172,7 +170,11 @@ fn main() -> Result<()> {
let compute = wait_spec(build_tag, &cli, cli_spec)?;
start_postgres(&cli, compute)?
bootstrap_compute(
compute,
#[cfg(target_os = "linux")]
&cli,
)
// Startup is finished, exit the startup tracing span
};
@@ -348,6 +350,8 @@ fn wait_spec(
ext_remote_storage: cli.remote_ext_config.clone(),
ext_download_progress: RwLock::new(HashMap::new()),
build_tag,
resize_swap_on_bind: cli.resize_swap_on_bind,
set_disk_quota_for_fs: cli.set_disk_quota_for_fs.clone(),
};
let compute = Arc::new(compute_node);
@@ -400,10 +404,11 @@ fn wait_spec(
Ok(compute)
}
fn start_postgres(
cli: &Cli,
// Start Postgres and some aux threads like various monitors
fn bootstrap_compute(
compute: Arc<ComputeNode>,
) -> Result<(Option<PostgresHandle>, StartPostgresResult)> {
#[cfg(target_os = "linux")] cli: &Cli,
) -> (Option<PostgresHandle>, StartPostgresResult) {
// We got all we need, update the state.
let mut state = compute.state.lock().unwrap();
state.set_status(ComputeStatus::Init, &compute.state_changed);
@@ -412,10 +417,9 @@ fn start_postgres(
"running compute with features: {:?}",
state.pspec.as_ref().unwrap().spec.features
);
// before we release the mutex, fetch some parameters for later.
// Before we release the mutex, fetch some parameters for later.
let &ComputeSpec {
swap_size_bytes,
disk_quota_bytes,
#[cfg(target_os = "linux")]
disable_lfc_resizing,
..
@@ -426,120 +430,86 @@ fn start_postgres(
let _monitor_handle = launch_monitor(&compute);
let _configurator_handle = launch_configurator(&compute);
let mut prestartup_failed = false;
let mut delay_exit = false;
// Resize swap to the desired size if the compute spec says so
if let (Some(size_bytes), true) = (swap_size_bytes, cli.resize_swap_on_bind) {
// To avoid 'swapoff' hitting postgres startup, we need to run resize-swap to completion
// *before* starting postgres.
//
// In theory, we could do this asynchronously if SkipSwapon was enabled for VMs, but this
// carries a risk of introducing hard-to-debug issues - e.g. if postgres sometimes gets
// OOM-killed during startup because swap wasn't available yet.
match resize_swap(size_bytes) {
Ok(()) => {
let size_mib = size_bytes as f32 / (1 << 20) as f32; // just for more coherent display.
info!(%size_bytes, %size_mib, "resized swap");
}
Err(err) => {
let err = err.context("failed to resize swap");
error!("{err:#}");
// Mark compute startup as failed; don't try to start postgres, and report this
// error to the control plane when it next asks.
prestartup_failed = true;
compute.set_failed_status(err);
delay_exit = true;
}
}
}
// Set disk quota if the compute spec says so
if let (Some(disk_quota_bytes), Some(disk_quota_fs_mountpoint)) =
(disk_quota_bytes, cli.set_disk_quota_for_fs.as_ref())
{
match set_disk_quota(disk_quota_bytes, disk_quota_fs_mountpoint) {
Ok(()) => {
let size_mib = disk_quota_bytes as f32 / (1 << 20) as f32; // just for more coherent display.
info!(%disk_quota_bytes, %size_mib, "set disk quota");
}
Err(err) => {
let err = err.context("failed to set disk quota");
error!("{err:#}");
// Mark compute startup as failed; don't try to start postgres, and report this
// error to the control plane when it next asks.
prestartup_failed = true;
compute.set_failed_status(err);
delay_exit = true;
}
}
}
// Start Postgres
let mut pg = None;
if !prestartup_failed {
pg = match compute.start_compute() {
Ok(pg) => {
info!(postmaster_pid = %pg.0.id(), "Postgres was started");
Some(pg)
match compute.start_compute() {
Ok(pg) => {
info!(postmaster_pid = %pg.0.id(), "Postgres was started");
// Start the vm-monitor if directed to. The vm-monitor only runs on linux
// because it requires cgroups.
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
use std::env;
use tokio_util::sync::CancellationToken;
// This token is used internally by the monitor to clean up all threads
let token = CancellationToken::new();
// Don't pass postgres connection string to vm-monitor if we don't want it to resize LFC
let pgconnstr = if disable_lfc_resizing.unwrap_or(false) {
None
} else {
Some(cli.filecache_connstr.clone())
};
let vm_monitor = if env::var_os("AUTOSCALING").is_some() {
let vm_monitor = tokio::spawn(vm_monitor::start(
Box::leak(Box::new(vm_monitor::Args {
cgroup: Some(cli.cgroup.clone()),
pgconnstr,
addr: cli.vm_monitor_addr.clone(),
})),
token.clone(),
));
Some(vm_monitor)
} else {
None
};
}
}
Err(err) => {
error!("could not start the compute node: {:#}", err);
compute.set_failed_status(err);
delay_exit = true;
None
(
Some(pg),
StartPostgresResult {
delay_exit,
compute,
#[cfg(target_os = "linux")]
token,
#[cfg(target_os = "linux")]
vm_monitor,
},
)
}
Err(err) => {
error!("could not start the compute node: {:#}", err);
compute.set_failed_status(err);
delay_exit = true;
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
use tokio_util::sync::CancellationToken;
let token = CancellationToken::new();
(
None,
StartPostgresResult {
delay_exit,
compute,
token,
vm_monitor: None,
},
)
} else {
(None, StartPostgresResult {
delay_exit,
compute,
})
}
}
};
} else {
warn!("skipping postgres startup because pre-startup step failed");
}
// Start the vm-monitor if directed to. The vm-monitor only runs on linux
// because it requires cgroups.
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
use std::env;
use tokio_util::sync::CancellationToken;
// This token is used internally by the monitor to clean up all threads
let token = CancellationToken::new();
// don't pass postgres connection string to vm-monitor if we don't want it to resize LFC
let pgconnstr = if disable_lfc_resizing.unwrap_or(false) {
None
} else {
Some(cli.filecache_connstr.clone())
};
let vm_monitor = if env::var_os("AUTOSCALING").is_some() {
let vm_monitor = tokio::spawn(vm_monitor::start(
Box::leak(Box::new(vm_monitor::Args {
cgroup: Some(cli.cgroup.clone()),
pgconnstr,
addr: cli.vm_monitor_addr.clone(),
})),
token.clone(),
));
Some(vm_monitor)
} else {
None
};
}
}
Ok((
pg,
StartPostgresResult {
delay_exit,
compute,
#[cfg(target_os = "linux")]
token,
#[cfg(target_os = "linux")]
vm_monitor,
},
))
}
type PostgresHandle = (std::process::Child, tokio::task::JoinHandle<Result<()>>);

View File

@@ -31,6 +31,7 @@ use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use utils::measured_stream::MeasuredReader;
use crate::disk_quota::set_disk_quota;
use crate::installed_extensions::get_installed_extensions;
use crate::pg_helpers::*;
use crate::spec::*;
@@ -44,9 +45,9 @@ use crate::spec_apply::PerDatabasePhase::{
ChangeSchemaPerms, DeleteDBRoleReferences, DropLogicalSubscriptions, HandleAnonExtension,
};
use crate::spec_apply::{DB, MutableApplyContext, PerDatabasePhase, apply_operations};
use crate::swap::resize_swap;
use crate::sync_sk::{check_if_synced, ping_safekeeper};
use crate::{config, extension_server, local_proxy};
pub static SYNC_SAFEKEEPERS_PID: AtomicU32 = AtomicU32::new(0);
pub static PG_PID: AtomicU32 = AtomicU32::new(0);
@@ -92,6 +93,10 @@ pub struct ComputeNode {
// key: ext_archive_name, value: started download time, download_completed?
pub ext_download_progress: RwLock<HashMap<String, (DateTime<Utc>, bool)>>,
pub build_tag: String,
/// Initialized from cli.resize_swap_on_bind
pub resize_swap_on_bind: bool,
/// Initialized from cli.set_disk_quota_for_fs
pub set_disk_quota_for_fs: Option<String>,
}
// store some metrics about download size that might impact startup time
@@ -1429,12 +1434,59 @@ impl ComputeNode {
Ok(())
}
/// Configure some VM parameters like swap and disk quota
#[instrument(skip_all)]
pub fn configure_vm(&self, spec: &ComputeSpec) -> Result<()> {
// Resize swap to the desired size if the compute spec says so
if let (Some(size_bytes), true) = (spec.swap_size_bytes, self.resize_swap_on_bind) {
// To avoid 'swapoff' hitting postgres startup, we need to run resize-swap to completion
// *before* starting postgres.
//
// In theory, we could do this asynchronously if SkipSwapon was enabled for VMs, but this
// carries a risk of introducing hard-to-debug issues - e.g. if postgres sometimes gets
// OOM-killed during startup because swap wasn't available yet.
match resize_swap(size_bytes) {
Ok(()) => {
let size_mib = size_bytes as f32 / (1 << 20) as f32; // just for more coherent display.
info!(%size_bytes, %size_mib, "resized swap");
}
Err(err) => {
let err = err.context("failed to resize swap");
error!("{err:#}");
return Err(err);
}
}
}
// Set disk quota if the compute spec says so
if let (Some(disk_quota_bytes), Some(disk_quota_fs_mountpoint)) =
(spec.disk_quota_bytes, self.set_disk_quota_for_fs.as_ref())
{
match set_disk_quota(disk_quota_bytes, disk_quota_fs_mountpoint) {
Ok(()) => {
let size_mib = disk_quota_bytes as f32 / (1 << 20) as f32; // just for more coherent display.
info!(%disk_quota_bytes, %size_mib, "set disk quota");
}
Err(err) => {
let err = err.context("failed to set disk quota");
error!("{err:#}");
return Err(err);
}
}
}
Ok(())
}
#[instrument(skip_all)]
pub fn start_compute(
&self,
) -> Result<(std::process::Child, tokio::task::JoinHandle<Result<()>>)> {
let compute_state = self.state.lock().unwrap().clone();
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
info!(
"starting compute for project {}, operation {}, tenant {}, timeline {}",
pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None"),
@@ -1443,9 +1495,11 @@ impl ComputeNode {
pspec.timeline_id,
);
// tune pgbouncer
self.configure_vm(&pspec.spec)?;
// Configure pgbouncer
if let Some(pgbouncer_settings) = &pspec.spec.pgbouncer_settings {
info!("tuning pgbouncer");
info!("configuring pgbouncer");
// Spawn a background task to do the tuning,
// so that we don't block the main thread that starts Postgres.
@@ -1471,14 +1525,11 @@ impl ComputeNode {
});
}
info!(
"start_compute spec.remote_extensions {:?}",
pspec.spec.remote_extensions
);
// This part is sync, because we need to download
// remote shared_preload_libraries before postgres start (if any)
if let Some(remote_extensions) = &pspec.spec.remote_extensions {
info!(?remote_extensions, "processing remote extensions");
// First, create control files for all availale extensions
extension_server::create_control_files(remote_extensions, &self.pgbin);
@@ -1497,7 +1548,7 @@ impl ComputeNode {
state.metrics.largest_ext_size = remote_ext_metrics.largest_ext_size;
state.metrics.total_ext_download_size = remote_ext_metrics.total_ext_download_size;
info!(
"Loading shared_preload_libraries took {:?}ms",
"loading shared_preload_libraries from remote extensions took {:?}ms",
library_load_time
);
info!("{:?}", remote_ext_metrics);
@@ -1556,6 +1607,7 @@ impl ComputeNode {
});
}
let metrics: ComputeMetrics;
let startup_end_time = Utc::now();
{
let mut state = self.state.lock().unwrap();
@@ -1574,21 +1626,17 @@ impl ComputeNode {
.to_std()
.unwrap()
.as_millis() as u64;
metrics = state.metrics.clone();
}
self.set_status(ComputeStatus::Running);
// Log metrics so that we can search for slow operations in logs
info!(
?metrics,
"finished configuration of compute for project {}",
pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None")
);
// Log metrics so that we can search for slow operations in logs
let metrics = {
let state = self.state.lock().unwrap();
state.metrics.clone()
};
info!(?metrics, "compute start finished");
Ok(pg_process)
}