mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-04 03:52:56 +00:00
compute_ctl: Rearrange startup code (#11007)
Move most of the code to compute.rs, so that all the major startup steps are visible in one place. You can now get a pretty good picture of what happens in the latency-critical path at compute startup by reading ComputeNode::start_compute(). This also clarifies the error handling in start_compute. Previously, the start_postgres function sometimes returned an Err, and sometimes Ok but with the compute status already set to Failed. Now the start_compute function always returns Err on failure, and it's the caller's responsibility to change the compute status to Failed. Separately from that, it returns a handle to the Postgres process via a `&mut` reference if it had already started Postgres (i.e. on success, or if the failure happens after launching the Postgres process). --------- Co-authored-by: Alexey Kondratov <kondratov.aleksey@gmail.com>
This commit is contained in:
committed by
GitHub
parent
ee0c8ca8fd
commit
066324d6ec
@@ -33,39 +33,27 @@
|
||||
//! -b /usr/local/bin/postgres \
|
||||
//! -r http://pg-ext-s3-gateway \
|
||||
//! ```
|
||||
use std::collections::HashMap;
|
||||
use std::ffi::OsString;
|
||||
use std::fs::File;
|
||||
use std::path::Path;
|
||||
use std::process::exit;
|
||||
use std::str::FromStr;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::{Arc, Condvar, Mutex, RwLock, mpsc};
|
||||
use std::sync::mpsc;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use chrono::Utc;
|
||||
use clap::Parser;
|
||||
use compute_api::responses::{ComputeCtlConfig, ComputeStatus};
|
||||
use compute_api::responses::ComputeCtlConfig;
|
||||
use compute_api::spec::ComputeSpec;
|
||||
use compute_tools::compute::{
|
||||
ComputeNode, ComputeState, PG_PID, ParsedSpec, forward_termination_signal,
|
||||
};
|
||||
use compute_tools::configurator::launch_configurator;
|
||||
use compute_tools::disk_quota::set_disk_quota;
|
||||
use compute_tools::compute::{ComputeNode, ComputeNodeParams, forward_termination_signal};
|
||||
use compute_tools::extension_server::get_pg_version_string;
|
||||
use compute_tools::http::server::Server;
|
||||
use compute_tools::logger::*;
|
||||
use compute_tools::lsn_lease::launch_lsn_lease_bg_task_for_static;
|
||||
use compute_tools::monitor::launch_monitor;
|
||||
use compute_tools::params::*;
|
||||
use compute_tools::spec::*;
|
||||
use compute_tools::swap::resize_swap;
|
||||
use rlimit::{Resource, setrlimit};
|
||||
use signal_hook::consts::{SIGINT, SIGQUIT, SIGTERM};
|
||||
use signal_hook::iterator::Signals;
|
||||
use tracing::{error, info, warn};
|
||||
use tracing::{error, info};
|
||||
use url::Url;
|
||||
use utils::failpoint_support;
|
||||
|
||||
@@ -164,29 +152,40 @@ fn main() -> Result<()> {
|
||||
// enable core dumping for all child processes
|
||||
setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?;
|
||||
|
||||
let (pg_handle, start_pg_result) = {
|
||||
// Enter startup tracing context
|
||||
let _startup_context_guard = startup_context_from_env();
|
||||
let connstr = Url::parse(&cli.connstr).context("cannot parse connstr as a URL")?;
|
||||
|
||||
let cli_spec = try_spec_from_cli(&cli)?;
|
||||
let cli_spec = try_spec_from_cli(&cli)?;
|
||||
|
||||
let compute = wait_spec(build_tag, &cli, cli_spec)?;
|
||||
let compute_node = ComputeNode::new(
|
||||
ComputeNodeParams {
|
||||
compute_id: cli.compute_id,
|
||||
connstr,
|
||||
pgdata: cli.pgdata.clone(),
|
||||
pgbin: cli.pgbin.clone(),
|
||||
pgversion: get_pg_version_string(&cli.pgbin),
|
||||
external_http_port: cli.external_http_port,
|
||||
internal_http_port: cli.internal_http_port,
|
||||
ext_remote_storage: cli.remote_ext_config.clone(),
|
||||
resize_swap_on_bind: cli.resize_swap_on_bind,
|
||||
set_disk_quota_for_fs: cli.set_disk_quota_for_fs,
|
||||
#[cfg(target_os = "linux")]
|
||||
filecache_connstr: cli.filecache_connstr,
|
||||
#[cfg(target_os = "linux")]
|
||||
cgroup: cli.cgroup,
|
||||
#[cfg(target_os = "linux")]
|
||||
vm_monitor_addr: cli.vm_monitor_addr,
|
||||
build_tag,
|
||||
|
||||
start_postgres(&cli, compute)?
|
||||
live_config_allowed: cli_spec.live_config_allowed,
|
||||
},
|
||||
cli_spec.spec,
|
||||
)?;
|
||||
|
||||
// Startup is finished, exit the startup tracing span
|
||||
};
|
||||
|
||||
// PostgreSQL is now running, if startup was successful. Wait until it exits.
|
||||
let wait_pg_result = wait_postgres(pg_handle)?;
|
||||
|
||||
let delay_exit = cleanup_after_postgres_exit(start_pg_result)?;
|
||||
|
||||
maybe_delay_exit(delay_exit);
|
||||
let exit_code = compute_node.run()?;
|
||||
|
||||
scenario.teardown();
|
||||
|
||||
deinit_and_exit(wait_pg_result);
|
||||
deinit_and_exit(exit_code);
|
||||
}
|
||||
|
||||
async fn init() -> Result<String> {
|
||||
@@ -207,56 +206,6 @@ async fn init() -> Result<String> {
|
||||
Ok(build_tag)
|
||||
}
|
||||
|
||||
fn startup_context_from_env() -> Option<opentelemetry::ContextGuard> {
|
||||
// Extract OpenTelemetry context for the startup actions from the
|
||||
// TRACEPARENT and TRACESTATE env variables, and attach it to the current
|
||||
// tracing context.
|
||||
//
|
||||
// This is used to propagate the context for the 'start_compute' operation
|
||||
// from the neon control plane. This allows linking together the wider
|
||||
// 'start_compute' operation that creates the compute container, with the
|
||||
// startup actions here within the container.
|
||||
//
|
||||
// There is no standard for passing context in env variables, but a lot of
|
||||
// tools use TRACEPARENT/TRACESTATE, so we use that convention too. See
|
||||
// https://github.com/open-telemetry/opentelemetry-specification/issues/740
|
||||
//
|
||||
// Switch to the startup context here, and exit it once the startup has
|
||||
// completed and Postgres is up and running.
|
||||
//
|
||||
// If this pod is pre-created without binding it to any particular endpoint
|
||||
// yet, this isn't the right place to enter the startup context. In that
|
||||
// case, the control plane should pass the tracing context as part of the
|
||||
// /configure API call.
|
||||
//
|
||||
// NOTE: This is supposed to only cover the *startup* actions. Once
|
||||
// postgres is configured and up-and-running, we exit this span. Any other
|
||||
// actions that are performed on incoming HTTP requests, for example, are
|
||||
// performed in separate spans.
|
||||
//
|
||||
// XXX: If the pod is restarted, we perform the startup actions in the same
|
||||
// context as the original startup actions, which probably doesn't make
|
||||
// sense.
|
||||
let mut startup_tracing_carrier: HashMap<String, String> = HashMap::new();
|
||||
if let Ok(val) = std::env::var("TRACEPARENT") {
|
||||
startup_tracing_carrier.insert("traceparent".to_string(), val);
|
||||
}
|
||||
if let Ok(val) = std::env::var("TRACESTATE") {
|
||||
startup_tracing_carrier.insert("tracestate".to_string(), val);
|
||||
}
|
||||
if !startup_tracing_carrier.is_empty() {
|
||||
use opentelemetry::propagation::TextMapPropagator;
|
||||
use opentelemetry_sdk::propagation::TraceContextPropagator;
|
||||
let guard = TraceContextPropagator::new()
|
||||
.extract(&startup_tracing_carrier)
|
||||
.attach();
|
||||
info!("startup tracing context attached");
|
||||
Some(guard)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
fn try_spec_from_cli(cli: &Cli) -> Result<CliSpecParams> {
|
||||
// First, try to get cluster spec from the cli argument
|
||||
if let Some(ref spec_json) = cli.spec_json {
|
||||
@@ -307,357 +256,7 @@ struct CliSpecParams {
|
||||
live_config_allowed: bool,
|
||||
}
|
||||
|
||||
fn wait_spec(
|
||||
build_tag: String,
|
||||
cli: &Cli,
|
||||
CliSpecParams {
|
||||
spec,
|
||||
live_config_allowed,
|
||||
compute_ctl_config: _,
|
||||
}: CliSpecParams,
|
||||
) -> Result<Arc<ComputeNode>> {
|
||||
let mut new_state = ComputeState::new();
|
||||
let spec_set;
|
||||
|
||||
if let Some(spec) = spec {
|
||||
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
|
||||
info!("new pspec.spec: {:?}", pspec.spec);
|
||||
new_state.pspec = Some(pspec);
|
||||
spec_set = true;
|
||||
} else {
|
||||
spec_set = false;
|
||||
}
|
||||
let connstr = Url::parse(&cli.connstr).context("cannot parse connstr as a URL")?;
|
||||
let conn_conf = postgres::config::Config::from_str(connstr.as_str())
|
||||
.context("cannot build postgres config from connstr")?;
|
||||
let tokio_conn_conf = tokio_postgres::config::Config::from_str(connstr.as_str())
|
||||
.context("cannot build tokio postgres config from connstr")?;
|
||||
let compute_node = ComputeNode {
|
||||
compute_id: cli.compute_id.clone(),
|
||||
connstr,
|
||||
conn_conf,
|
||||
tokio_conn_conf,
|
||||
pgdata: cli.pgdata.clone(),
|
||||
pgbin: cli.pgbin.clone(),
|
||||
pgversion: get_pg_version_string(&cli.pgbin),
|
||||
external_http_port: cli.external_http_port,
|
||||
internal_http_port: cli.internal_http_port,
|
||||
live_config_allowed,
|
||||
state: Mutex::new(new_state),
|
||||
state_changed: Condvar::new(),
|
||||
ext_remote_storage: cli.remote_ext_config.clone(),
|
||||
ext_download_progress: RwLock::new(HashMap::new()),
|
||||
build_tag,
|
||||
};
|
||||
let compute = Arc::new(compute_node);
|
||||
|
||||
// If this is a pooled VM, prewarm before starting HTTP server and becoming
|
||||
// available for binding. Prewarming helps Postgres start quicker later,
|
||||
// because QEMU will already have its memory allocated from the host, and
|
||||
// the necessary binaries will already be cached.
|
||||
if !spec_set {
|
||||
compute.prewarm_postgres()?;
|
||||
}
|
||||
|
||||
// Launch the external HTTP server first, so that we can serve control plane
|
||||
// requests while configuration is still in progress.
|
||||
Server::External(cli.external_http_port).launch(&compute);
|
||||
|
||||
// The internal HTTP server could be launched later, but there isn't much
|
||||
// sense in waiting.
|
||||
Server::Internal(cli.internal_http_port).launch(&compute);
|
||||
|
||||
if !spec_set {
|
||||
// No spec provided, hang waiting for it.
|
||||
info!("no compute spec provided, waiting");
|
||||
|
||||
let mut state = compute.state.lock().unwrap();
|
||||
while state.status != ComputeStatus::ConfigurationPending {
|
||||
state = compute.state_changed.wait(state).unwrap();
|
||||
|
||||
if state.status == ComputeStatus::ConfigurationPending {
|
||||
info!("got spec, continue configuration");
|
||||
// Spec is already set by the http server handler.
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Record for how long we slept waiting for the spec.
|
||||
let now = Utc::now();
|
||||
state.metrics.wait_for_spec_ms = now
|
||||
.signed_duration_since(state.start_time)
|
||||
.to_std()
|
||||
.unwrap()
|
||||
.as_millis() as u64;
|
||||
|
||||
// Reset start time, so that the total startup time that is calculated later will
|
||||
// not include the time that we waited for the spec.
|
||||
state.start_time = now;
|
||||
}
|
||||
|
||||
launch_lsn_lease_bg_task_for_static(&compute);
|
||||
|
||||
Ok(compute)
|
||||
}
|
||||
|
||||
fn start_postgres(
|
||||
cli: &Cli,
|
||||
compute: Arc<ComputeNode>,
|
||||
) -> Result<(Option<PostgresHandle>, StartPostgresResult)> {
|
||||
// We got all we need, update the state.
|
||||
let mut state = compute.state.lock().unwrap();
|
||||
|
||||
// Create a tracing span for the startup operation.
|
||||
//
|
||||
// We could otherwise just annotate the function with #[instrument], but if
|
||||
// we're being configured from a /configure HTTP request, we want the
|
||||
// startup to be considered part of the /configure request.
|
||||
let _this_entered = {
|
||||
// Temporarily enter the /configure request's span, so that the new span
|
||||
// becomes its child.
|
||||
let _parent_entered = state.startup_span.take().map(|p| p.entered());
|
||||
|
||||
tracing::info_span!("start_postgres")
|
||||
}
|
||||
.entered();
|
||||
|
||||
state.set_status(ComputeStatus::Init, &compute.state_changed);
|
||||
|
||||
info!(
|
||||
"running compute with features: {:?}",
|
||||
state.pspec.as_ref().unwrap().spec.features
|
||||
);
|
||||
// before we release the mutex, fetch some parameters for later.
|
||||
let &ComputeSpec {
|
||||
swap_size_bytes,
|
||||
disk_quota_bytes,
|
||||
#[cfg(target_os = "linux")]
|
||||
disable_lfc_resizing,
|
||||
..
|
||||
} = &state.pspec.as_ref().unwrap().spec;
|
||||
drop(state);
|
||||
|
||||
// Launch remaining service threads
|
||||
let _monitor_handle = launch_monitor(&compute);
|
||||
let _configurator_handle = launch_configurator(&compute);
|
||||
|
||||
let mut prestartup_failed = false;
|
||||
let mut delay_exit = false;
|
||||
|
||||
// Resize swap to the desired size if the compute spec says so
|
||||
if let (Some(size_bytes), true) = (swap_size_bytes, cli.resize_swap_on_bind) {
|
||||
// To avoid 'swapoff' hitting postgres startup, we need to run resize-swap to completion
|
||||
// *before* starting postgres.
|
||||
//
|
||||
// In theory, we could do this asynchronously if SkipSwapon was enabled for VMs, but this
|
||||
// carries a risk of introducing hard-to-debug issues - e.g. if postgres sometimes gets
|
||||
// OOM-killed during startup because swap wasn't available yet.
|
||||
match resize_swap(size_bytes) {
|
||||
Ok(()) => {
|
||||
let size_mib = size_bytes as f32 / (1 << 20) as f32; // just for more coherent display.
|
||||
info!(%size_bytes, %size_mib, "resized swap");
|
||||
}
|
||||
Err(err) => {
|
||||
let err = err.context("failed to resize swap");
|
||||
error!("{err:#}");
|
||||
|
||||
// Mark compute startup as failed; don't try to start postgres, and report this
|
||||
// error to the control plane when it next asks.
|
||||
prestartup_failed = true;
|
||||
compute.set_failed_status(err);
|
||||
delay_exit = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Set disk quota if the compute spec says so
|
||||
if let (Some(disk_quota_bytes), Some(disk_quota_fs_mountpoint)) =
|
||||
(disk_quota_bytes, cli.set_disk_quota_for_fs.as_ref())
|
||||
{
|
||||
match set_disk_quota(disk_quota_bytes, disk_quota_fs_mountpoint) {
|
||||
Ok(()) => {
|
||||
let size_mib = disk_quota_bytes as f32 / (1 << 20) as f32; // just for more coherent display.
|
||||
info!(%disk_quota_bytes, %size_mib, "set disk quota");
|
||||
}
|
||||
Err(err) => {
|
||||
let err = err.context("failed to set disk quota");
|
||||
error!("{err:#}");
|
||||
|
||||
// Mark compute startup as failed; don't try to start postgres, and report this
|
||||
// error to the control plane when it next asks.
|
||||
prestartup_failed = true;
|
||||
compute.set_failed_status(err);
|
||||
delay_exit = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Start Postgres
|
||||
let mut pg = None;
|
||||
if !prestartup_failed {
|
||||
pg = match compute.start_compute() {
|
||||
Ok(pg) => {
|
||||
info!(postmaster_pid = %pg.0.id(), "Postgres was started");
|
||||
Some(pg)
|
||||
}
|
||||
Err(err) => {
|
||||
error!("could not start the compute node: {:#}", err);
|
||||
compute.set_failed_status(err);
|
||||
delay_exit = true;
|
||||
None
|
||||
}
|
||||
};
|
||||
} else {
|
||||
warn!("skipping postgres startup because pre-startup step failed");
|
||||
}
|
||||
|
||||
// Start the vm-monitor if directed to. The vm-monitor only runs on linux
|
||||
// because it requires cgroups.
|
||||
cfg_if::cfg_if! {
|
||||
if #[cfg(target_os = "linux")] {
|
||||
use std::env;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
// This token is used internally by the monitor to clean up all threads
|
||||
let token = CancellationToken::new();
|
||||
|
||||
// don't pass postgres connection string to vm-monitor if we don't want it to resize LFC
|
||||
let pgconnstr = if disable_lfc_resizing.unwrap_or(false) {
|
||||
None
|
||||
} else {
|
||||
Some(cli.filecache_connstr.clone())
|
||||
};
|
||||
|
||||
let vm_monitor = if env::var_os("AUTOSCALING").is_some() {
|
||||
let vm_monitor = tokio::spawn(vm_monitor::start(
|
||||
Box::leak(Box::new(vm_monitor::Args {
|
||||
cgroup: Some(cli.cgroup.clone()),
|
||||
pgconnstr,
|
||||
addr: cli.vm_monitor_addr.clone(),
|
||||
})),
|
||||
token.clone(),
|
||||
));
|
||||
Some(vm_monitor)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
Ok((
|
||||
pg,
|
||||
StartPostgresResult {
|
||||
delay_exit,
|
||||
compute,
|
||||
#[cfg(target_os = "linux")]
|
||||
token,
|
||||
#[cfg(target_os = "linux")]
|
||||
vm_monitor,
|
||||
},
|
||||
))
|
||||
}
|
||||
|
||||
type PostgresHandle = (std::process::Child, tokio::task::JoinHandle<Result<()>>);
|
||||
|
||||
struct StartPostgresResult {
|
||||
delay_exit: bool,
|
||||
// passed through from WaitSpecResult
|
||||
compute: Arc<ComputeNode>,
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
token: tokio_util::sync::CancellationToken,
|
||||
#[cfg(target_os = "linux")]
|
||||
vm_monitor: Option<tokio::task::JoinHandle<Result<()>>>,
|
||||
}
|
||||
|
||||
fn wait_postgres(pg: Option<PostgresHandle>) -> Result<WaitPostgresResult> {
|
||||
// Wait for the child Postgres process forever. In this state Ctrl+C will
|
||||
// propagate to Postgres and it will be shut down as well.
|
||||
let mut exit_code = None;
|
||||
if let Some((mut pg, logs_handle)) = pg {
|
||||
info!(postmaster_pid = %pg.id(), "Waiting for Postgres to exit");
|
||||
|
||||
let ecode = pg
|
||||
.wait()
|
||||
.expect("failed to start waiting on Postgres process");
|
||||
PG_PID.store(0, Ordering::SeqCst);
|
||||
|
||||
// Process has exited. Wait for the log collecting task to finish.
|
||||
let _ = tokio::runtime::Handle::current()
|
||||
.block_on(logs_handle)
|
||||
.map_err(|e| tracing::error!("log task panicked: {:?}", e));
|
||||
|
||||
info!("Postgres exited with code {}, shutting down", ecode);
|
||||
exit_code = ecode.code()
|
||||
}
|
||||
|
||||
Ok(WaitPostgresResult { exit_code })
|
||||
}
|
||||
|
||||
struct WaitPostgresResult {
|
||||
exit_code: Option<i32>,
|
||||
}
|
||||
|
||||
fn cleanup_after_postgres_exit(
|
||||
StartPostgresResult {
|
||||
mut delay_exit,
|
||||
compute,
|
||||
#[cfg(target_os = "linux")]
|
||||
vm_monitor,
|
||||
#[cfg(target_os = "linux")]
|
||||
token,
|
||||
}: StartPostgresResult,
|
||||
) -> Result<bool> {
|
||||
// Terminate the vm_monitor so it releases the file watcher on
|
||||
// /sys/fs/cgroup/neon-postgres.
|
||||
// Note: the vm-monitor only runs on linux because it requires cgroups.
|
||||
cfg_if::cfg_if! {
|
||||
if #[cfg(target_os = "linux")] {
|
||||
if let Some(handle) = vm_monitor {
|
||||
// Kills all threads spawned by the monitor
|
||||
token.cancel();
|
||||
// Kills the actual task running the monitor
|
||||
handle.abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Maybe sync safekeepers again, to speed up next startup
|
||||
let compute_state = compute.state.lock().unwrap().clone();
|
||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
if matches!(pspec.spec.mode, compute_api::spec::ComputeMode::Primary) {
|
||||
info!("syncing safekeepers on shutdown");
|
||||
let storage_auth_token = pspec.storage_auth_token.clone();
|
||||
let lsn = compute.sync_safekeepers(storage_auth_token)?;
|
||||
info!("synced safekeepers at lsn {lsn}");
|
||||
}
|
||||
|
||||
let mut state = compute.state.lock().unwrap();
|
||||
if state.status == ComputeStatus::TerminationPending {
|
||||
state.status = ComputeStatus::Terminated;
|
||||
compute.state_changed.notify_all();
|
||||
// we were asked to terminate gracefully, don't exit to avoid restart
|
||||
delay_exit = true
|
||||
}
|
||||
drop(state);
|
||||
|
||||
if let Err(err) = compute.check_for_core_dumps() {
|
||||
error!("error while checking for core dumps: {err:?}");
|
||||
}
|
||||
|
||||
Ok(delay_exit)
|
||||
}
|
||||
|
||||
fn maybe_delay_exit(delay_exit: bool) {
|
||||
// If launch failed, keep serving HTTP requests for a while, so the cloud
|
||||
// control plane can get the actual error.
|
||||
if delay_exit {
|
||||
info!("giving control plane 30s to collect the error before shutdown");
|
||||
thread::sleep(Duration::from_secs(30));
|
||||
}
|
||||
}
|
||||
|
||||
fn deinit_and_exit(WaitPostgresResult { exit_code }: WaitPostgresResult) -> ! {
|
||||
fn deinit_and_exit(exit_code: Option<i32>) -> ! {
|
||||
// Shutdown trace pipeline gracefully, so that it has a chance to send any
|
||||
// pending traces before we exit. Shutting down OTEL tracing provider may
|
||||
// hang for quite some time, see, for example:
|
||||
|
||||
@@ -58,14 +58,14 @@ pub async fn get_database_schema(
|
||||
compute: &Arc<ComputeNode>,
|
||||
dbname: &str,
|
||||
) -> Result<impl Stream<Item = Result<bytes::Bytes, std::io::Error>> + use<>, SchemaDumpError> {
|
||||
let pgbin = &compute.pgbin;
|
||||
let pgbin = &compute.params.pgbin;
|
||||
let basepath = Path::new(pgbin).parent().unwrap();
|
||||
let pgdump = basepath.join("pg_dump");
|
||||
|
||||
// Replace the DB in the connection string and disable it to parts.
|
||||
// This is the only option to handle DBs with special characters.
|
||||
let conf =
|
||||
postgres_conf_for_db(&compute.connstr, dbname).map_err(|_| SchemaDumpError::Unexpected)?;
|
||||
let conf = postgres_conf_for_db(&compute.params.connstr, dbname)
|
||||
.map_err(|_| SchemaDumpError::Unexpected)?;
|
||||
let host = conf
|
||||
.get_hosts()
|
||||
.first()
|
||||
|
||||
@@ -28,28 +28,53 @@ use utils::id::{TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::measured_stream::MeasuredReader;
|
||||
|
||||
use crate::configurator::launch_configurator;
|
||||
use crate::disk_quota::set_disk_quota;
|
||||
use crate::installed_extensions::get_installed_extensions;
|
||||
use crate::logger::startup_context_from_env;
|
||||
use crate::lsn_lease::launch_lsn_lease_bg_task_for_static;
|
||||
use crate::monitor::launch_monitor;
|
||||
use crate::pg_helpers::*;
|
||||
use crate::spec::*;
|
||||
use crate::swap::resize_swap;
|
||||
use crate::sync_sk::{check_if_synced, ping_safekeeper};
|
||||
use crate::{config, extension_server, local_proxy};
|
||||
|
||||
pub static SYNC_SAFEKEEPERS_PID: AtomicU32 = AtomicU32::new(0);
|
||||
pub static PG_PID: AtomicU32 = AtomicU32::new(0);
|
||||
|
||||
/// Compute node info shared across several `compute_ctl` threads.
|
||||
pub struct ComputeNode {
|
||||
/// Static configuration params that don't change after startup. These mostly
|
||||
/// come from the CLI args, or are derived from them.
|
||||
pub struct ComputeNodeParams {
|
||||
/// The ID of the compute
|
||||
pub compute_id: String,
|
||||
// Url type maintains proper escaping
|
||||
pub connstr: url::Url,
|
||||
// We connect to Postgres from many different places, so build configs once
|
||||
// and reuse them where needed.
|
||||
pub conn_conf: postgres::config::Config,
|
||||
pub tokio_conn_conf: tokio_postgres::config::Config,
|
||||
|
||||
pub resize_swap_on_bind: bool,
|
||||
pub set_disk_quota_for_fs: Option<String>,
|
||||
|
||||
// VM monitor parameters
|
||||
#[cfg(target_os = "linux")]
|
||||
pub filecache_connstr: String,
|
||||
#[cfg(target_os = "linux")]
|
||||
pub cgroup: String,
|
||||
#[cfg(target_os = "linux")]
|
||||
pub vm_monitor_addr: String,
|
||||
|
||||
pub pgdata: String,
|
||||
pub pgbin: String,
|
||||
pub pgversion: String,
|
||||
pub build_tag: String,
|
||||
|
||||
/// The port that the compute's external HTTP server listens on
|
||||
pub external_http_port: u16,
|
||||
/// The port that the compute's internal HTTP server listens on
|
||||
pub internal_http_port: u16,
|
||||
|
||||
/// the address of extension storage proxy gateway
|
||||
pub ext_remote_storage: Option<String>,
|
||||
|
||||
/// We should only allow live re- / configuration of the compute node if
|
||||
/// it uses 'pull model', i.e. it can go to control-plane and fetch
|
||||
/// the latest configuration. Otherwise, there could be a case:
|
||||
@@ -63,10 +88,17 @@ pub struct ComputeNode {
|
||||
/// - we push spec and it does configuration
|
||||
/// - but then it is restarted without any spec again
|
||||
pub live_config_allowed: bool,
|
||||
/// The port that the compute's external HTTP server listens on
|
||||
pub external_http_port: u16,
|
||||
/// The port that the compute's internal HTTP server listens on
|
||||
pub internal_http_port: u16,
|
||||
}
|
||||
|
||||
/// Compute node info shared across several `compute_ctl` threads.
|
||||
pub struct ComputeNode {
|
||||
pub params: ComputeNodeParams,
|
||||
|
||||
// We connect to Postgres from many different places, so build configs once
|
||||
// and reuse them where needed. These are derived from 'params.connstr'
|
||||
pub conn_conf: postgres::config::Config,
|
||||
pub tokio_conn_conf: tokio_postgres::config::Config,
|
||||
|
||||
/// Volatile part of the `ComputeNode`, which should be used under `Mutex`.
|
||||
/// To allow HTTP API server to serving status requests, while configuration
|
||||
/// is in progress, lock should be held only for short periods of time to do
|
||||
@@ -74,11 +106,9 @@ pub struct ComputeNode {
|
||||
pub state: Mutex<ComputeState>,
|
||||
/// `Condvar` to allow notifying waiters about state changes.
|
||||
pub state_changed: Condvar,
|
||||
/// the address of extension storage proxy gateway
|
||||
pub ext_remote_storage: Option<String>,
|
||||
|
||||
// key: ext_archive_name, value: started download time, download_completed?
|
||||
pub ext_download_progress: RwLock<HashMap<String, (DateTime<Utc>, bool)>>,
|
||||
pub build_tag: String,
|
||||
}
|
||||
|
||||
// store some metrics about download size that might impact startup time
|
||||
@@ -242,6 +272,25 @@ fn maybe_cgexec(cmd: &str) -> Command {
|
||||
}
|
||||
}
|
||||
|
||||
struct PostgresHandle {
|
||||
postgres: std::process::Child,
|
||||
log_collector: tokio::task::JoinHandle<Result<()>>,
|
||||
}
|
||||
|
||||
impl PostgresHandle {
|
||||
/// Return PID of the postgres (postmaster) process
|
||||
fn pid(&self) -> Pid {
|
||||
Pid::from_raw(self.postgres.id() as i32)
|
||||
}
|
||||
}
|
||||
|
||||
struct StartVmMonitorResult {
|
||||
#[cfg(target_os = "linux")]
|
||||
token: tokio_util::sync::CancellationToken,
|
||||
#[cfg(target_os = "linux")]
|
||||
vm_monitor: Option<tokio::task::JoinHandle<Result<()>>>,
|
||||
}
|
||||
|
||||
pub(crate) fn construct_superuser_query(spec: &ComputeSpec) -> String {
|
||||
let roles = spec
|
||||
.cluster
|
||||
@@ -316,6 +365,421 @@ pub(crate) fn construct_superuser_query(spec: &ComputeSpec) -> String {
|
||||
}
|
||||
|
||||
impl ComputeNode {
|
||||
pub fn new(params: ComputeNodeParams, cli_spec: Option<ComputeSpec>) -> Result<Self> {
|
||||
let connstr = params.connstr.as_str();
|
||||
let conn_conf = postgres::config::Config::from_str(connstr)
|
||||
.context("cannot build postgres config from connstr")?;
|
||||
let tokio_conn_conf = tokio_postgres::config::Config::from_str(connstr)
|
||||
.context("cannot build tokio postgres config from connstr")?;
|
||||
|
||||
let mut new_state = ComputeState::new();
|
||||
if let Some(cli_spec) = cli_spec {
|
||||
let pspec = ParsedSpec::try_from(cli_spec).map_err(|msg| anyhow::anyhow!(msg))?;
|
||||
new_state.pspec = Some(pspec);
|
||||
}
|
||||
|
||||
Ok(ComputeNode {
|
||||
params,
|
||||
conn_conf,
|
||||
tokio_conn_conf,
|
||||
state: Mutex::new(new_state),
|
||||
state_changed: Condvar::new(),
|
||||
ext_download_progress: RwLock::new(HashMap::new()),
|
||||
})
|
||||
}
|
||||
|
||||
/// Top-level control flow of compute_ctl. Returns a process exit code we should
|
||||
/// exit with.
|
||||
pub fn run(self) -> Result<Option<i32>> {
|
||||
let this = Arc::new(self);
|
||||
|
||||
let cli_spec = this.state.lock().unwrap().pspec.clone();
|
||||
|
||||
// If this is a pooled VM, prewarm before starting HTTP server and becoming
|
||||
// available for binding. Prewarming helps Postgres start quicker later,
|
||||
// because QEMU will already have its memory allocated from the host, and
|
||||
// the necessary binaries will already be cached.
|
||||
if cli_spec.is_none() {
|
||||
this.prewarm_postgres()?;
|
||||
}
|
||||
|
||||
// Launch the external HTTP server first, so that we can serve control plane
|
||||
// requests while configuration is still in progress.
|
||||
crate::http::server::Server::External(this.params.external_http_port).launch(&this);
|
||||
|
||||
// The internal HTTP server could be launched later, but there isn't much
|
||||
// sense in waiting.
|
||||
crate::http::server::Server::Internal(this.params.internal_http_port).launch(&this);
|
||||
|
||||
// If we got a spec from the CLI already, use that. Otherwise wait for the
|
||||
// control plane to pass it to us with a /configure HTTP request
|
||||
let pspec = if let Some(cli_spec) = cli_spec {
|
||||
cli_spec
|
||||
} else {
|
||||
this.wait_spec()?
|
||||
};
|
||||
|
||||
launch_lsn_lease_bg_task_for_static(&this);
|
||||
|
||||
// We have a spec, start the compute
|
||||
let mut delay_exit = false;
|
||||
let mut vm_monitor = None;
|
||||
let mut pg_process: Option<PostgresHandle> = None;
|
||||
|
||||
match this.start_compute(&mut pg_process) {
|
||||
Ok(()) => {
|
||||
// Success! Launch remaining services (just vm-monitor currently)
|
||||
vm_monitor =
|
||||
Some(this.start_vm_monitor(pspec.spec.disable_lfc_resizing.unwrap_or(false)));
|
||||
}
|
||||
Err(err) => {
|
||||
// Something went wrong with the startup. Log it and expose the error to
|
||||
// HTTP status requests.
|
||||
error!("could not start the compute node: {:#}", err);
|
||||
this.set_failed_status(err);
|
||||
delay_exit = true;
|
||||
|
||||
// If the error happened after starting PostgreSQL, kill it
|
||||
if let Some(ref pg_process) = pg_process {
|
||||
kill(pg_process.pid(), Signal::SIGQUIT).ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If startup was successful, or it failed in the late stages,
|
||||
// PostgreSQL is now running. Wait until it exits.
|
||||
let exit_code = if let Some(pg_handle) = pg_process {
|
||||
let exit_status = this.wait_postgres(pg_handle);
|
||||
info!("Postgres exited with code {}, shutting down", exit_status);
|
||||
exit_status.code()
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Terminate the vm_monitor so it releases the file watcher on
|
||||
// /sys/fs/cgroup/neon-postgres.
|
||||
// Note: the vm-monitor only runs on linux because it requires cgroups.
|
||||
if let Some(vm_monitor) = vm_monitor {
|
||||
cfg_if::cfg_if! {
|
||||
if #[cfg(target_os = "linux")] {
|
||||
// Kills all threads spawned by the monitor
|
||||
vm_monitor.token.cancel();
|
||||
if let Some(handle) = vm_monitor.vm_monitor {
|
||||
// Kills the actual task running the monitor
|
||||
handle.abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Reap the postgres process
|
||||
delay_exit |= this.cleanup_after_postgres_exit()?;
|
||||
|
||||
// If launch failed, keep serving HTTP requests for a while, so the cloud
|
||||
// control plane can get the actual error.
|
||||
if delay_exit {
|
||||
info!("giving control plane 30s to collect the error before shutdown");
|
||||
std::thread::sleep(Duration::from_secs(30));
|
||||
}
|
||||
Ok(exit_code)
|
||||
}
|
||||
|
||||
pub fn wait_spec(&self) -> Result<ParsedSpec> {
|
||||
info!("no compute spec provided, waiting");
|
||||
let mut state = self.state.lock().unwrap();
|
||||
while state.status != ComputeStatus::ConfigurationPending {
|
||||
state = self.state_changed.wait(state).unwrap();
|
||||
}
|
||||
|
||||
info!("got spec, continue configuration");
|
||||
let spec = state.pspec.as_ref().unwrap().clone();
|
||||
|
||||
// Record for how long we slept waiting for the spec.
|
||||
let now = Utc::now();
|
||||
state.metrics.wait_for_spec_ms = now
|
||||
.signed_duration_since(state.start_time)
|
||||
.to_std()
|
||||
.unwrap()
|
||||
.as_millis() as u64;
|
||||
|
||||
// Reset start time, so that the total startup time that is calculated later will
|
||||
// not include the time that we waited for the spec.
|
||||
state.start_time = now;
|
||||
|
||||
Ok(spec)
|
||||
}
|
||||
|
||||
/// Start compute.
|
||||
///
|
||||
/// Prerequisites:
|
||||
/// - the compute spec has been placed in self.state.pspec
|
||||
///
|
||||
/// On success:
|
||||
/// - status is set to ComputeStatus::Running
|
||||
/// - self.running_postgres is set
|
||||
///
|
||||
/// On error:
|
||||
/// - status is left in ComputeStatus::Init. The caller is responsible for setting it to Failed
|
||||
/// - if Postgres was started before the fatal error happened, self.running_postgres is
|
||||
/// set. The caller is responsible for killing it.
|
||||
fn start_compute(self: &Arc<Self>, pg_handle: &mut Option<PostgresHandle>) -> Result<()> {
|
||||
let compute_state: ComputeState;
|
||||
|
||||
let _this_entered;
|
||||
{
|
||||
let mut state_guard = self.state.lock().unwrap();
|
||||
|
||||
// Create a tracing span for the startup operation.
|
||||
//
|
||||
// We could otherwise just annotate the function with #[instrument], but if
|
||||
// we're being configured from a /configure HTTP request, we want the
|
||||
// startup to be considered part of the /configure request.
|
||||
//
|
||||
// Similarly, if a trace ID was passed in env variables, attach it to the span.
|
||||
_this_entered = {
|
||||
// Temporarily enter the parent span, so that the new span becomes its child.
|
||||
if let Some(p) = state_guard.startup_span.take() {
|
||||
let _parent_entered = p.entered();
|
||||
tracing::info_span!("start_compute")
|
||||
} else if let Some(otel_context) = startup_context_from_env() {
|
||||
use tracing_opentelemetry::OpenTelemetrySpanExt;
|
||||
let span = tracing::info_span!("start_compute");
|
||||
span.set_parent(otel_context);
|
||||
span
|
||||
} else {
|
||||
tracing::info_span!("start_compute")
|
||||
}
|
||||
}
|
||||
.entered();
|
||||
|
||||
state_guard.set_status(ComputeStatus::Init, &self.state_changed);
|
||||
compute_state = state_guard.clone()
|
||||
}
|
||||
|
||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
info!(
|
||||
"starting compute for project {}, operation {}, tenant {}, timeline {}, features {:?}, spec.remote_extensions {:?}",
|
||||
pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None"),
|
||||
pspec.spec.operation_uuid.as_deref().unwrap_or("None"),
|
||||
pspec.tenant_id,
|
||||
pspec.timeline_id,
|
||||
pspec.spec.features,
|
||||
pspec.spec.remote_extensions,
|
||||
);
|
||||
|
||||
// Launch remaining service threads
|
||||
let _monitor_handle = launch_monitor(self);
|
||||
let _configurator_handle = launch_configurator(self);
|
||||
|
||||
// Resize swap to the desired size if the compute spec says so
|
||||
if let (Some(size_bytes), true) =
|
||||
(pspec.spec.swap_size_bytes, self.params.resize_swap_on_bind)
|
||||
{
|
||||
// To avoid 'swapoff' hitting postgres startup, we need to run resize-swap to completion
|
||||
// *before* starting postgres.
|
||||
//
|
||||
// In theory, we could do this asynchronously if SkipSwapon was enabled for VMs, but this
|
||||
// carries a risk of introducing hard-to-debug issues - e.g. if postgres sometimes gets
|
||||
// OOM-killed during startup because swap wasn't available yet.
|
||||
resize_swap(size_bytes).context("failed to resize swap")?;
|
||||
let size_mib = size_bytes as f32 / (1 << 20) as f32; // just for more coherent display.
|
||||
info!(%size_bytes, %size_mib, "resized swap");
|
||||
}
|
||||
|
||||
// Set disk quota if the compute spec says so
|
||||
if let (Some(disk_quota_bytes), Some(disk_quota_fs_mountpoint)) = (
|
||||
pspec.spec.disk_quota_bytes,
|
||||
self.params.set_disk_quota_for_fs.as_ref(),
|
||||
) {
|
||||
set_disk_quota(disk_quota_bytes, disk_quota_fs_mountpoint)
|
||||
.context("failed to set disk quota")?;
|
||||
let size_mib = disk_quota_bytes as f32 / (1 << 20) as f32; // just for more coherent display.
|
||||
info!(%disk_quota_bytes, %size_mib, "set disk quota");
|
||||
}
|
||||
|
||||
// tune pgbouncer
|
||||
if let Some(pgbouncer_settings) = &pspec.spec.pgbouncer_settings {
|
||||
info!("tuning pgbouncer");
|
||||
|
||||
// Spawn a background task to do the tuning,
|
||||
// so that we don't block the main thread that starts Postgres.
|
||||
let pgbouncer_settings = pgbouncer_settings.clone();
|
||||
let _handle = tokio::spawn(async move {
|
||||
let res = tune_pgbouncer(pgbouncer_settings).await;
|
||||
if let Err(err) = res {
|
||||
error!("error while tuning pgbouncer: {err:?}");
|
||||
// Continue with the startup anyway
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// configure local_proxy
|
||||
if let Some(local_proxy) = &pspec.spec.local_proxy_config {
|
||||
info!("configuring local_proxy");
|
||||
|
||||
// Spawn a background task to do the configuration,
|
||||
// so that we don't block the main thread that starts Postgres.
|
||||
let local_proxy = local_proxy.clone();
|
||||
let _handle = tokio::spawn(async move {
|
||||
if let Err(err) = local_proxy::configure(&local_proxy) {
|
||||
error!("error while configuring local_proxy: {err:?}");
|
||||
// Continue with the startup anyway
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// This part is sync, because we need to download
|
||||
// remote shared_preload_libraries before postgres start (if any)
|
||||
if let Some(remote_extensions) = &pspec.spec.remote_extensions {
|
||||
// First, create control files for all availale extensions
|
||||
extension_server::create_control_files(remote_extensions, &self.params.pgbin);
|
||||
|
||||
let library_load_start_time = Utc::now();
|
||||
let rt = tokio::runtime::Handle::current();
|
||||
let remote_ext_metrics = rt.block_on(self.prepare_preload_libraries(&pspec.spec))?;
|
||||
|
||||
let library_load_time = Utc::now()
|
||||
.signed_duration_since(library_load_start_time)
|
||||
.to_std()
|
||||
.unwrap()
|
||||
.as_millis() as u64;
|
||||
let mut state = self.state.lock().unwrap();
|
||||
state.metrics.load_ext_ms = library_load_time;
|
||||
state.metrics.num_ext_downloaded = remote_ext_metrics.num_ext_downloaded;
|
||||
state.metrics.largest_ext_size = remote_ext_metrics.largest_ext_size;
|
||||
state.metrics.total_ext_download_size = remote_ext_metrics.total_ext_download_size;
|
||||
info!(
|
||||
"Loading shared_preload_libraries took {:?}ms",
|
||||
library_load_time
|
||||
);
|
||||
info!("{:?}", remote_ext_metrics);
|
||||
}
|
||||
|
||||
// Prepre pgdata directory. This downloads the basebackup, among other things.
|
||||
self.prepare_pgdata(&compute_state)?;
|
||||
|
||||
// Start Postgres
|
||||
let start_time = Utc::now();
|
||||
let pg_process = self.start_postgres(pspec.storage_auth_token.clone())?;
|
||||
let postmaster_pid = pg_process.pid();
|
||||
*pg_handle = Some(pg_process);
|
||||
|
||||
// If this is a primary endpoint, perform some post-startup configuration before
|
||||
// opening it up for the world.
|
||||
let config_time = Utc::now();
|
||||
if pspec.spec.mode == ComputeMode::Primary {
|
||||
self.configure_as_primary(&compute_state)?;
|
||||
let conf = self.get_conn_conf(None);
|
||||
tokio::task::spawn_blocking(|| {
|
||||
let res = get_installed_extensions(conf);
|
||||
match res {
|
||||
Ok(extensions) => {
|
||||
info!(
|
||||
"[NEON_EXT_STAT] {}",
|
||||
serde_json::to_string(&extensions)
|
||||
.expect("failed to serialize extensions list")
|
||||
);
|
||||
}
|
||||
Err(err) => error!("could not get installed extensions: {err:?}"),
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// All done!
|
||||
let startup_end_time = Utc::now();
|
||||
let metrics = {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
state.metrics.start_postgres_ms = config_time
|
||||
.signed_duration_since(start_time)
|
||||
.to_std()
|
||||
.unwrap()
|
||||
.as_millis() as u64;
|
||||
state.metrics.config_ms = startup_end_time
|
||||
.signed_duration_since(config_time)
|
||||
.to_std()
|
||||
.unwrap()
|
||||
.as_millis() as u64;
|
||||
state.metrics.total_startup_ms = startup_end_time
|
||||
.signed_duration_since(compute_state.start_time)
|
||||
.to_std()
|
||||
.unwrap()
|
||||
.as_millis() as u64;
|
||||
state.metrics.clone()
|
||||
};
|
||||
self.set_status(ComputeStatus::Running);
|
||||
|
||||
// Log metrics so that we can search for slow operations in logs
|
||||
info!(?metrics, postmaster_pid = %postmaster_pid, "compute start finished");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Start the vm-monitor if directed to. The vm-monitor only runs on linux
|
||||
/// because it requires cgroups.
|
||||
fn start_vm_monitor(&self, disable_lfc_resizing: bool) -> StartVmMonitorResult {
|
||||
cfg_if::cfg_if! {
|
||||
if #[cfg(target_os = "linux")] {
|
||||
use std::env;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
// This token is used internally by the monitor to clean up all threads
|
||||
let token = CancellationToken::new();
|
||||
|
||||
// don't pass postgres connection string to vm-monitor if we don't want it to resize LFC
|
||||
let pgconnstr = if disable_lfc_resizing {
|
||||
None
|
||||
} else {
|
||||
Some(self.params.filecache_connstr.clone())
|
||||
};
|
||||
|
||||
let vm_monitor = if env::var_os("AUTOSCALING").is_some() {
|
||||
let vm_monitor = tokio::spawn(vm_monitor::start(
|
||||
Box::leak(Box::new(vm_monitor::Args {
|
||||
cgroup: Some(self.params.cgroup.clone()),
|
||||
pgconnstr,
|
||||
addr: self.params.vm_monitor_addr.clone(),
|
||||
})),
|
||||
token.clone(),
|
||||
));
|
||||
Some(vm_monitor)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
StartVmMonitorResult { token, vm_monitor }
|
||||
} else {
|
||||
StartVmMonitorResult { }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn cleanup_after_postgres_exit(&self) -> Result<bool> {
|
||||
// Maybe sync safekeepers again, to speed up next startup
|
||||
let compute_state = self.state.lock().unwrap().clone();
|
||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
if matches!(pspec.spec.mode, compute_api::spec::ComputeMode::Primary) {
|
||||
info!("syncing safekeepers on shutdown");
|
||||
let storage_auth_token = pspec.storage_auth_token.clone();
|
||||
let lsn = self.sync_safekeepers(storage_auth_token)?;
|
||||
info!("synced safekeepers at lsn {lsn}");
|
||||
}
|
||||
|
||||
let mut delay_exit = false;
|
||||
let mut state = self.state.lock().unwrap();
|
||||
if state.status == ComputeStatus::TerminationPending {
|
||||
state.status = ComputeStatus::Terminated;
|
||||
self.state_changed.notify_all();
|
||||
// we were asked to terminate gracefully, don't exit to avoid restart
|
||||
delay_exit = true
|
||||
}
|
||||
drop(state);
|
||||
|
||||
if let Err(err) = self.check_for_core_dumps() {
|
||||
error!("error while checking for core dumps: {err:?}");
|
||||
}
|
||||
|
||||
Ok(delay_exit)
|
||||
}
|
||||
|
||||
/// Check that compute node has corresponding feature enabled.
|
||||
pub fn has_feature(&self, feature: ComputeFeature) -> bool {
|
||||
let state = self.state.lock().unwrap();
|
||||
@@ -354,9 +818,10 @@ impl ComputeNode {
|
||||
fn create_pgdata(&self) -> Result<()> {
|
||||
// Ignore removal error, likely it is a 'No such file or directory (os error 2)'.
|
||||
// If it is something different then create_dir() will error out anyway.
|
||||
let _ok = fs::remove_dir_all(&self.pgdata);
|
||||
fs::create_dir(&self.pgdata)?;
|
||||
fs::set_permissions(&self.pgdata, fs::Permissions::from_mode(0o700))?;
|
||||
let pgdata = &self.params.pgdata;
|
||||
let _ok = fs::remove_dir_all(pgdata);
|
||||
fs::create_dir(pgdata)?;
|
||||
fs::set_permissions(pgdata, fs::Permissions::from_mode(0o700))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -421,7 +886,7 @@ impl ComputeNode {
|
||||
// sends an Error after finishing the tarball, we will not notice it.
|
||||
let mut ar = tar::Archive::new(flate2::read::GzDecoder::new(&mut bufreader));
|
||||
ar.set_ignore_zeros(true);
|
||||
ar.unpack(&self.pgdata)?;
|
||||
ar.unpack(&self.params.pgdata)?;
|
||||
|
||||
// Report metrics
|
||||
let mut state = self.state.lock().unwrap();
|
||||
@@ -566,9 +1031,9 @@ impl ComputeNode {
|
||||
pub fn sync_safekeepers(&self, storage_auth_token: Option<String>) -> Result<Lsn> {
|
||||
let start_time = Utc::now();
|
||||
|
||||
let mut sync_handle = maybe_cgexec(&self.pgbin)
|
||||
let mut sync_handle = maybe_cgexec(&self.params.pgbin)
|
||||
.args(["--sync-safekeepers"])
|
||||
.env("PGDATA", &self.pgdata) // we cannot use -D in this mode
|
||||
.env("PGDATA", &self.params.pgdata) // we cannot use -D in this mode
|
||||
.envs(if let Some(storage_auth_token) = &storage_auth_token {
|
||||
vec![("NEON_AUTH_TOKEN", storage_auth_token)]
|
||||
} else {
|
||||
@@ -625,14 +1090,14 @@ impl ComputeNode {
|
||||
pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> {
|
||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
let spec = &pspec.spec;
|
||||
let pgdata_path = Path::new(&self.pgdata);
|
||||
let pgdata_path = Path::new(&self.params.pgdata);
|
||||
|
||||
// Remove/create an empty pgdata directory and put configuration there.
|
||||
self.create_pgdata()?;
|
||||
config::write_postgres_conf(
|
||||
&pgdata_path.join("postgresql.conf"),
|
||||
&pspec.spec,
|
||||
self.internal_http_port,
|
||||
self.params.internal_http_port,
|
||||
)?;
|
||||
|
||||
// Syncing safekeepers is only safe with primary nodes: if a primary
|
||||
@@ -732,12 +1197,15 @@ impl ComputeNode {
|
||||
info!("prewarming");
|
||||
|
||||
// Create pgdata
|
||||
let pgdata = &format!("{}.warmup", self.pgdata);
|
||||
let pgdata = &format!("{}.warmup", self.params.pgdata);
|
||||
create_pgdata(pgdata)?;
|
||||
|
||||
// Run initdb to completion
|
||||
info!("running initdb");
|
||||
let initdb_bin = Path::new(&self.pgbin).parent().unwrap().join("initdb");
|
||||
let initdb_bin = Path::new(&self.params.pgbin)
|
||||
.parent()
|
||||
.unwrap()
|
||||
.join("initdb");
|
||||
Command::new(initdb_bin)
|
||||
.args(["--pgdata", pgdata])
|
||||
.output()
|
||||
@@ -753,7 +1221,7 @@ impl ComputeNode {
|
||||
|
||||
// Start postgres
|
||||
info!("starting postgres");
|
||||
let mut pg = maybe_cgexec(&self.pgbin)
|
||||
let mut pg = maybe_cgexec(&self.params.pgbin)
|
||||
.args(["-D", pgdata])
|
||||
.spawn()
|
||||
.expect("cannot start postgres process");
|
||||
@@ -780,15 +1248,12 @@ impl ComputeNode {
|
||||
///
|
||||
/// Returns a handle to the child process and a handle to the logs thread.
|
||||
#[instrument(skip_all)]
|
||||
pub fn start_postgres(
|
||||
&self,
|
||||
storage_auth_token: Option<String>,
|
||||
) -> Result<(std::process::Child, tokio::task::JoinHandle<Result<()>>)> {
|
||||
let pgdata_path = Path::new(&self.pgdata);
|
||||
pub fn start_postgres(&self, storage_auth_token: Option<String>) -> Result<PostgresHandle> {
|
||||
let pgdata_path = Path::new(&self.params.pgdata);
|
||||
|
||||
// Run postgres as a child process.
|
||||
let mut pg = maybe_cgexec(&self.pgbin)
|
||||
.args(["-D", &self.pgdata])
|
||||
let mut pg = maybe_cgexec(&self.params.pgbin)
|
||||
.args(["-D", &self.params.pgdata])
|
||||
.envs(if let Some(storage_auth_token) = &storage_auth_token {
|
||||
vec![("NEON_AUTH_TOKEN", storage_auth_token)]
|
||||
} else {
|
||||
@@ -805,7 +1270,29 @@ impl ComputeNode {
|
||||
|
||||
wait_for_postgres(&mut pg, pgdata_path)?;
|
||||
|
||||
Ok((pg, logs_handle))
|
||||
Ok(PostgresHandle {
|
||||
postgres: pg,
|
||||
log_collector: logs_handle,
|
||||
})
|
||||
}
|
||||
|
||||
/// Wait for the child Postgres process forever. In this state Ctrl+C will
|
||||
/// propagate to Postgres and it will be shut down as well.
|
||||
fn wait_postgres(&self, mut pg_handle: PostgresHandle) -> std::process::ExitStatus {
|
||||
info!(postmaster_pid = %pg_handle.postgres.id(), "Waiting for Postgres to exit");
|
||||
|
||||
let ecode = pg_handle
|
||||
.postgres
|
||||
.wait()
|
||||
.expect("failed to start waiting on Postgres process");
|
||||
PG_PID.store(0, Ordering::SeqCst);
|
||||
|
||||
// Process has exited. Wait for the log collecting task to finish.
|
||||
let _ = tokio::runtime::Handle::current()
|
||||
.block_on(pg_handle.log_collector)
|
||||
.map_err(|e| tracing::error!("log task panicked: {:?}", e));
|
||||
|
||||
ecode
|
||||
}
|
||||
|
||||
/// Do post configuration of the already started Postgres. This function spawns a background task to
|
||||
@@ -972,9 +1459,12 @@ impl ComputeNode {
|
||||
// `pg_ctl` for start / stop.
|
||||
#[instrument(skip_all)]
|
||||
fn pg_reload_conf(&self) -> Result<()> {
|
||||
let pgctl_bin = Path::new(&self.pgbin).parent().unwrap().join("pg_ctl");
|
||||
let pgctl_bin = Path::new(&self.params.pgbin)
|
||||
.parent()
|
||||
.unwrap()
|
||||
.join("pg_ctl");
|
||||
Command::new(pgctl_bin)
|
||||
.args(["reload", "-D", &self.pgdata])
|
||||
.args(["reload", "-D", &self.params.pgdata])
|
||||
.output()
|
||||
.expect("cannot run pg_ctl process");
|
||||
Ok(())
|
||||
@@ -1014,9 +1504,9 @@ impl ComputeNode {
|
||||
}
|
||||
|
||||
// Write new config
|
||||
let pgdata_path = Path::new(&self.pgdata);
|
||||
let pgdata_path = Path::new(&self.params.pgdata);
|
||||
let postgresql_conf_path = pgdata_path.join("postgresql.conf");
|
||||
config::write_postgres_conf(&postgresql_conf_path, &spec, self.internal_http_port)?;
|
||||
config::write_postgres_conf(&postgresql_conf_path, &spec, self.params.internal_http_port)?;
|
||||
|
||||
if !spec.skip_pg_catalog_updates {
|
||||
let max_concurrent_connections = spec.reconfigure_concurrency;
|
||||
@@ -1027,7 +1517,8 @@ impl ComputeNode {
|
||||
self.pg_reload_conf()?;
|
||||
|
||||
if spec.mode == ComputeMode::Primary {
|
||||
let mut conf = tokio_postgres::Config::from_str(self.connstr.as_str()).unwrap();
|
||||
let mut conf =
|
||||
tokio_postgres::Config::from_str(self.params.connstr.as_str()).unwrap();
|
||||
conf.application_name("apply_config");
|
||||
let conf = Arc::new(conf);
|
||||
|
||||
@@ -1053,166 +1544,52 @@ impl ComputeNode {
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub fn start_compute(
|
||||
&self,
|
||||
) -> Result<(std::process::Child, tokio::task::JoinHandle<Result<()>>)> {
|
||||
let compute_state = self.state.lock().unwrap().clone();
|
||||
pub fn configure_as_primary(&self, compute_state: &ComputeState) -> Result<()> {
|
||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
info!(
|
||||
"starting compute for project {}, operation {}, tenant {}, timeline {}",
|
||||
pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None"),
|
||||
pspec.spec.operation_uuid.as_deref().unwrap_or("None"),
|
||||
pspec.tenant_id,
|
||||
pspec.timeline_id,
|
||||
);
|
||||
|
||||
// tune pgbouncer
|
||||
if let Some(pgbouncer_settings) = &pspec.spec.pgbouncer_settings {
|
||||
info!("tuning pgbouncer");
|
||||
assert!(pspec.spec.mode == ComputeMode::Primary);
|
||||
if !pspec.spec.skip_pg_catalog_updates {
|
||||
let pgdata_path = Path::new(&self.params.pgdata);
|
||||
// temporarily reset max_cluster_size in config
|
||||
// to avoid the possibility of hitting the limit, while we are applying config:
|
||||
// creating new extensions, roles, etc...
|
||||
config::with_compute_ctl_tmp_override(pgdata_path, "neon.max_cluster_size=-1", || {
|
||||
self.pg_reload_conf()?;
|
||||
|
||||
// Spawn a background task to do the tuning,
|
||||
// so that we don't block the main thread that starts Postgres.
|
||||
let pgbouncer_settings = pgbouncer_settings.clone();
|
||||
let _handle = tokio::spawn(async move {
|
||||
let res = tune_pgbouncer(pgbouncer_settings).await;
|
||||
if let Err(err) = res {
|
||||
error!("error while tuning pgbouncer: {err:?}");
|
||||
}
|
||||
});
|
||||
self.apply_config(compute_state)?;
|
||||
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
let postgresql_conf_path = pgdata_path.join("postgresql.conf");
|
||||
if config::line_in_file(
|
||||
&postgresql_conf_path,
|
||||
"neon.disable_logical_replication_subscribers=false",
|
||||
)? {
|
||||
info!(
|
||||
"updated postgresql.conf to set neon.disable_logical_replication_subscribers=false"
|
||||
);
|
||||
}
|
||||
self.pg_reload_conf()?;
|
||||
}
|
||||
self.post_apply_config()?;
|
||||
|
||||
if let Some(local_proxy) = &pspec.spec.local_proxy_config {
|
||||
info!("configuring local_proxy");
|
||||
|
||||
// Spawn a background task to do the configuration,
|
||||
// so that we don't block the main thread that starts Postgres.
|
||||
let local_proxy = local_proxy.clone();
|
||||
let _handle = tokio::spawn(async move {
|
||||
if let Err(err) = local_proxy::configure(&local_proxy) {
|
||||
error!("error while configuring local_proxy: {err:?}");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
info!(
|
||||
"start_compute spec.remote_extensions {:?}",
|
||||
pspec.spec.remote_extensions
|
||||
);
|
||||
|
||||
// This part is sync, because we need to download
|
||||
// remote shared_preload_libraries before postgres start (if any)
|
||||
if let Some(remote_extensions) = &pspec.spec.remote_extensions {
|
||||
// First, create control files for all availale extensions
|
||||
extension_server::create_control_files(remote_extensions, &self.pgbin);
|
||||
|
||||
let library_load_start_time = Utc::now();
|
||||
let rt = tokio::runtime::Handle::current();
|
||||
let remote_ext_metrics = rt.block_on(self.prepare_preload_libraries(&pspec.spec))?;
|
||||
|
||||
let library_load_time = Utc::now()
|
||||
.signed_duration_since(library_load_start_time)
|
||||
.to_std()
|
||||
.unwrap()
|
||||
.as_millis() as u64;
|
||||
let mut state = self.state.lock().unwrap();
|
||||
state.metrics.load_ext_ms = library_load_time;
|
||||
state.metrics.num_ext_downloaded = remote_ext_metrics.num_ext_downloaded;
|
||||
state.metrics.largest_ext_size = remote_ext_metrics.largest_ext_size;
|
||||
state.metrics.total_ext_download_size = remote_ext_metrics.total_ext_download_size;
|
||||
info!(
|
||||
"Loading shared_preload_libraries took {:?}ms",
|
||||
library_load_time
|
||||
);
|
||||
info!("{:?}", remote_ext_metrics);
|
||||
}
|
||||
|
||||
self.prepare_pgdata(&compute_state)?;
|
||||
|
||||
let start_time = Utc::now();
|
||||
let pg_process = self.start_postgres(pspec.storage_auth_token.clone())?;
|
||||
|
||||
let config_time = Utc::now();
|
||||
if pspec.spec.mode == ComputeMode::Primary {
|
||||
if !pspec.spec.skip_pg_catalog_updates {
|
||||
let pgdata_path = Path::new(&self.pgdata);
|
||||
// temporarily reset max_cluster_size in config
|
||||
// to avoid the possibility of hitting the limit, while we are applying config:
|
||||
// creating new extensions, roles, etc...
|
||||
config::with_compute_ctl_tmp_override(
|
||||
pgdata_path,
|
||||
"neon.max_cluster_size=-1",
|
||||
|| {
|
||||
self.pg_reload_conf()?;
|
||||
|
||||
self.apply_config(&compute_state)?;
|
||||
|
||||
Ok(())
|
||||
},
|
||||
)?;
|
||||
|
||||
let postgresql_conf_path = pgdata_path.join("postgresql.conf");
|
||||
if config::line_in_file(
|
||||
&postgresql_conf_path,
|
||||
"neon.disable_logical_replication_subscribers=false",
|
||||
)? {
|
||||
let conf = self.get_conn_conf(None);
|
||||
tokio::task::spawn_blocking(|| {
|
||||
let res = get_installed_extensions(conf);
|
||||
match res {
|
||||
Ok(extensions) => {
|
||||
info!(
|
||||
"updated postgresql.conf to set neon.disable_logical_replication_subscribers=false"
|
||||
"[NEON_EXT_STAT] {}",
|
||||
serde_json::to_string(&extensions)
|
||||
.expect("failed to serialize extensions list")
|
||||
);
|
||||
}
|
||||
self.pg_reload_conf()?;
|
||||
Err(err) => error!("could not get installed extensions: {err:?}"),
|
||||
}
|
||||
self.post_apply_config()?;
|
||||
});
|
||||
|
||||
let conf = self.get_conn_conf(None);
|
||||
tokio::task::spawn_blocking(|| {
|
||||
let res = get_installed_extensions(conf);
|
||||
match res {
|
||||
Ok(extensions) => {
|
||||
info!(
|
||||
"[NEON_EXT_STAT] {}",
|
||||
serde_json::to_string(&extensions)
|
||||
.expect("failed to serialize extensions list")
|
||||
);
|
||||
}
|
||||
Err(err) => error!("could not get installed extensions: {err:?}"),
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let startup_end_time = Utc::now();
|
||||
{
|
||||
let mut state = self.state.lock().unwrap();
|
||||
state.metrics.start_postgres_ms = config_time
|
||||
.signed_duration_since(start_time)
|
||||
.to_std()
|
||||
.unwrap()
|
||||
.as_millis() as u64;
|
||||
state.metrics.config_ms = startup_end_time
|
||||
.signed_duration_since(config_time)
|
||||
.to_std()
|
||||
.unwrap()
|
||||
.as_millis() as u64;
|
||||
state.metrics.total_startup_ms = startup_end_time
|
||||
.signed_duration_since(compute_state.start_time)
|
||||
.to_std()
|
||||
.unwrap()
|
||||
.as_millis() as u64;
|
||||
}
|
||||
self.set_status(ComputeStatus::Running);
|
||||
|
||||
info!(
|
||||
"finished configuration of compute for project {}",
|
||||
pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None")
|
||||
);
|
||||
|
||||
// Log metrics so that we can search for slow operations in logs
|
||||
let metrics = {
|
||||
let state = self.state.lock().unwrap();
|
||||
state.metrics.clone()
|
||||
};
|
||||
info!(?metrics, "compute start finished");
|
||||
|
||||
Ok(pg_process)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Update the `last_active` in the shared state, but ensure that it's a more recent one.
|
||||
@@ -1241,7 +1618,7 @@ impl ComputeNode {
|
||||
pub fn check_for_core_dumps(&self) -> Result<()> {
|
||||
let core_dump_dir = match std::env::consts::OS {
|
||||
"macos" => Path::new("/cores/"),
|
||||
_ => Path::new(&self.pgdata),
|
||||
_ => Path::new(&self.params.pgdata),
|
||||
};
|
||||
|
||||
// Collect core dump paths if any
|
||||
@@ -1271,7 +1648,7 @@ impl ComputeNode {
|
||||
|
||||
// Try first with gdb
|
||||
let backtrace = Command::new("gdb")
|
||||
.args(["--batch", "-q", "-ex", "bt", &self.pgbin])
|
||||
.args(["--batch", "-q", "-ex", "bt", &self.params.pgbin])
|
||||
.arg(&core_path)
|
||||
.output();
|
||||
|
||||
@@ -1348,7 +1725,8 @@ LIMIT 100",
|
||||
ext_path: RemotePath,
|
||||
) -> Result<u64, DownloadError> {
|
||||
let ext_remote_storage =
|
||||
self.ext_remote_storage
|
||||
self.params
|
||||
.ext_remote_storage
|
||||
.as_ref()
|
||||
.ok_or(DownloadError::BadInput(anyhow::anyhow!(
|
||||
"Remote extensions storage is not configured",
|
||||
@@ -1411,7 +1789,7 @@ LIMIT 100",
|
||||
&real_ext_name,
|
||||
&ext_path,
|
||||
ext_remote_storage,
|
||||
&self.pgbin,
|
||||
&self.params.pgbin,
|
||||
)
|
||||
.await
|
||||
.map_err(DownloadError::Other);
|
||||
@@ -1519,7 +1897,7 @@ LIMIT 100",
|
||||
&self,
|
||||
spec: &ComputeSpec,
|
||||
) -> Result<RemoteExtensionMetrics> {
|
||||
if self.ext_remote_storage.is_none() {
|
||||
if self.params.ext_remote_storage.is_none() {
|
||||
return Ok(RemoteExtensionMetrics {
|
||||
num_ext_downloaded: 0,
|
||||
largest_ext_size: 0,
|
||||
@@ -1570,8 +1948,12 @@ LIMIT 100",
|
||||
|
||||
let mut download_tasks = Vec::new();
|
||||
for library in &libs_vec {
|
||||
let (ext_name, ext_path) =
|
||||
remote_extensions.get_ext(library, true, &self.build_tag, &self.pgversion)?;
|
||||
let (ext_name, ext_path) = remote_extensions.get_ext(
|
||||
library,
|
||||
true,
|
||||
&self.params.build_tag,
|
||||
&self.params.pgversion,
|
||||
)?;
|
||||
download_tasks.push(self.download_extension(ext_name, ext_path));
|
||||
}
|
||||
let results = join_all(download_tasks).await;
|
||||
|
||||
@@ -22,7 +22,7 @@ pub(in crate::http) async fn configure(
|
||||
State(compute): State<Arc<ComputeNode>>,
|
||||
request: Json<ConfigurationRequest>,
|
||||
) -> Response {
|
||||
if !compute.live_config_allowed {
|
||||
if !compute.params.live_config_allowed {
|
||||
return JsonResponse::error(
|
||||
StatusCode::PRECONDITION_FAILED,
|
||||
"live configuration is not allowed for this compute node".to_string(),
|
||||
|
||||
@@ -18,11 +18,11 @@ pub(in crate::http) struct ExtensionServerParams {
|
||||
/// Download a remote extension.
|
||||
pub(in crate::http) async fn download_extension(
|
||||
Path(filename): Path<String>,
|
||||
params: Query<ExtensionServerParams>,
|
||||
ext_server_params: Query<ExtensionServerParams>,
|
||||
State(compute): State<Arc<ComputeNode>>,
|
||||
) -> Response {
|
||||
// Don't even try to download extensions if no remote storage is configured
|
||||
if compute.ext_remote_storage.is_none() {
|
||||
if compute.params.ext_remote_storage.is_none() {
|
||||
return JsonResponse::error(
|
||||
StatusCode::PRECONDITION_FAILED,
|
||||
"remote storage is not configured",
|
||||
@@ -46,9 +46,9 @@ pub(in crate::http) async fn download_extension(
|
||||
|
||||
remote_extensions.get_ext(
|
||||
&filename,
|
||||
params.is_library,
|
||||
&compute.build_tag,
|
||||
&compute.pgversion,
|
||||
ext_server_params.is_library,
|
||||
&compute.params.build_tag,
|
||||
&compute.params.pgversion,
|
||||
)
|
||||
};
|
||||
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
use std::collections::HashMap;
|
||||
use tracing::info;
|
||||
use tracing_subscriber::layer::SubscriberExt;
|
||||
use tracing_subscriber::prelude::*;
|
||||
|
||||
@@ -42,3 +44,50 @@ pub async fn init_tracing_and_logging(default_log_level: &str) -> anyhow::Result
|
||||
pub fn inlinify(s: &str) -> String {
|
||||
s.replace('\n', "\u{200B}")
|
||||
}
|
||||
|
||||
pub fn startup_context_from_env() -> Option<opentelemetry::Context> {
|
||||
// Extract OpenTelemetry context for the startup actions from the
|
||||
// TRACEPARENT and TRACESTATE env variables, and attach it to the current
|
||||
// tracing context.
|
||||
//
|
||||
// This is used to propagate the context for the 'start_compute' operation
|
||||
// from the neon control plane. This allows linking together the wider
|
||||
// 'start_compute' operation that creates the compute container, with the
|
||||
// startup actions here within the container.
|
||||
//
|
||||
// There is no standard for passing context in env variables, but a lot of
|
||||
// tools use TRACEPARENT/TRACESTATE, so we use that convention too. See
|
||||
// https://github.com/open-telemetry/opentelemetry-specification/issues/740
|
||||
//
|
||||
// Switch to the startup context here, and exit it once the startup has
|
||||
// completed and Postgres is up and running.
|
||||
//
|
||||
// If this pod is pre-created without binding it to any particular endpoint
|
||||
// yet, this isn't the right place to enter the startup context. In that
|
||||
// case, the control plane should pass the tracing context as part of the
|
||||
// /configure API call.
|
||||
//
|
||||
// NOTE: This is supposed to only cover the *startup* actions. Once
|
||||
// postgres is configured and up-and-running, we exit this span. Any other
|
||||
// actions that are performed on incoming HTTP requests, for example, are
|
||||
// performed in separate spans.
|
||||
//
|
||||
// XXX: If the pod is restarted, we perform the startup actions in the same
|
||||
// context as the original startup actions, which probably doesn't make
|
||||
// sense.
|
||||
let mut startup_tracing_carrier: HashMap<String, String> = HashMap::new();
|
||||
if let Ok(val) = std::env::var("TRACEPARENT") {
|
||||
startup_tracing_carrier.insert("traceparent".to_string(), val);
|
||||
}
|
||||
if let Ok(val) = std::env::var("TRACESTATE") {
|
||||
startup_tracing_carrier.insert("tracestate".to_string(), val);
|
||||
}
|
||||
if !startup_tracing_carrier.is_empty() {
|
||||
use opentelemetry::propagation::TextMapPropagator;
|
||||
use opentelemetry_sdk::propagation::TraceContextPropagator;
|
||||
info!("got startup tracing context from env variables");
|
||||
Some(TraceContextPropagator::new().extract(&startup_tracing_carrier))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,7 +18,7 @@ const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500);
|
||||
// should be handled gracefully.
|
||||
fn watch_compute_activity(compute: &ComputeNode) {
|
||||
// Suppose that `connstr` doesn't change
|
||||
let connstr = compute.connstr.clone();
|
||||
let connstr = compute.params.connstr.clone();
|
||||
let conf = compute.get_conn_conf(Some("compute_ctl:activity_monitor"));
|
||||
|
||||
// During startup and configuration we connect to every Postgres database,
|
||||
|
||||
Reference in New Issue
Block a user