mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-11 07:22:55 +00:00
I've moved it to the API handler in the 589cf1ed2 to do not delay
compute start. Yet, we now skip full configuration and catalog updates
in the most hot path -- waking up suspended compute, and only do it at:
- first start
- start with applying new configuration
- start for availability check
so it doesn't really matter anymore.
The problem with creating the table and test record in the API handler
is that someone can fill up timeline till the logical limit. Then it's
suspended and availability check is scheduled, so it fails.
If table + test row are created at the very beginning, we reserve a 8 KB
page for future checks, which theoretically will last almost forever.
For example, my ~1y old branch still has 8 KB sized test table:
```sql
cloud_admin@postgres=# select pg_relation_size('health_check');
pg_relation_size
------------------
8192
(1 row)
```
---------
Co-authored-by: Anastasia Lubennikova <anastasia@neon.tech>
1115 lines
42 KiB
Rust
1115 lines
42 KiB
Rust
use std::collections::HashMap;
|
|
use std::env;
|
|
use std::fs;
|
|
use std::io::BufRead;
|
|
use std::os::unix::fs::PermissionsExt;
|
|
use std::path::Path;
|
|
use std::process::{Command, Stdio};
|
|
use std::str::FromStr;
|
|
use std::sync::{Condvar, Mutex, RwLock};
|
|
use std::time::Instant;
|
|
|
|
use anyhow::{Context, Result};
|
|
use chrono::{DateTime, Utc};
|
|
use futures::future::join_all;
|
|
use futures::stream::FuturesUnordered;
|
|
use futures::StreamExt;
|
|
use postgres::{Client, NoTls};
|
|
use tokio;
|
|
use tokio_postgres;
|
|
use tracing::{error, info, instrument, warn};
|
|
use utils::id::{TenantId, TimelineId};
|
|
use utils::lsn::Lsn;
|
|
|
|
use compute_api::responses::{ComputeMetrics, ComputeStatus};
|
|
use compute_api::spec::{ComputeMode, ComputeSpec};
|
|
use utils::measured_stream::MeasuredReader;
|
|
|
|
use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath};
|
|
|
|
use crate::checker::create_availability_check_data;
|
|
use crate::pg_helpers::*;
|
|
use crate::spec::*;
|
|
use crate::sync_sk::{check_if_synced, ping_safekeeper};
|
|
use crate::{config, extension_server};
|
|
|
|
/// Compute node info shared across several `compute_ctl` threads.
|
|
pub struct ComputeNode {
|
|
// Url type maintains proper escaping
|
|
pub connstr: url::Url,
|
|
pub pgdata: String,
|
|
pub pgbin: String,
|
|
pub pgversion: String,
|
|
/// We should only allow live re- / configuration of the compute node if
|
|
/// it uses 'pull model', i.e. it can go to control-plane and fetch
|
|
/// the latest configuration. Otherwise, there could be a case:
|
|
/// - we start compute with some spec provided as argument
|
|
/// - we push new spec and it does reconfiguration
|
|
/// - but then something happens and compute pod / VM is destroyed,
|
|
/// so k8s controller starts it again with the **old** spec
|
|
/// and the same for empty computes:
|
|
/// - we started compute without any spec
|
|
/// - we push spec and it does configuration
|
|
/// - but then it is restarted without any spec again
|
|
pub live_config_allowed: bool,
|
|
/// Volatile part of the `ComputeNode`, which should be used under `Mutex`.
|
|
/// To allow HTTP API server to serving status requests, while configuration
|
|
/// is in progress, lock should be held only for short periods of time to do
|
|
/// read/write, not the whole configuration process.
|
|
pub state: Mutex<ComputeState>,
|
|
/// `Condvar` to allow notifying waiters about state changes.
|
|
pub state_changed: Condvar,
|
|
/// the S3 bucket that we search for extensions in
|
|
pub ext_remote_storage: Option<GenericRemoteStorage>,
|
|
// key: ext_archive_name, value: started download time, download_completed?
|
|
pub ext_download_progress: RwLock<HashMap<String, (DateTime<Utc>, bool)>>,
|
|
pub build_tag: String,
|
|
}
|
|
|
|
// store some metrics about download size that might impact startup time
|
|
#[derive(Clone, Debug)]
|
|
pub struct RemoteExtensionMetrics {
|
|
num_ext_downloaded: u64,
|
|
largest_ext_size: u64,
|
|
total_ext_download_size: u64,
|
|
}
|
|
|
|
#[derive(Clone, Debug)]
|
|
pub struct ComputeState {
|
|
pub start_time: DateTime<Utc>,
|
|
pub status: ComputeStatus,
|
|
/// Timestamp of the last Postgres activity. It could be `None` if
|
|
/// compute wasn't used since start.
|
|
pub last_active: Option<DateTime<Utc>>,
|
|
pub error: Option<String>,
|
|
pub pspec: Option<ParsedSpec>,
|
|
pub metrics: ComputeMetrics,
|
|
}
|
|
|
|
impl ComputeState {
|
|
pub fn new() -> Self {
|
|
Self {
|
|
start_time: Utc::now(),
|
|
status: ComputeStatus::Empty,
|
|
last_active: None,
|
|
error: None,
|
|
pspec: None,
|
|
metrics: ComputeMetrics::default(),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Default for ComputeState {
|
|
fn default() -> Self {
|
|
Self::new()
|
|
}
|
|
}
|
|
|
|
#[derive(Clone, Debug)]
|
|
pub struct ParsedSpec {
|
|
pub spec: ComputeSpec,
|
|
pub tenant_id: TenantId,
|
|
pub timeline_id: TimelineId,
|
|
pub pageserver_connstr: String,
|
|
pub safekeeper_connstrings: Vec<String>,
|
|
pub storage_auth_token: Option<String>,
|
|
}
|
|
|
|
impl TryFrom<ComputeSpec> for ParsedSpec {
|
|
type Error = String;
|
|
fn try_from(spec: ComputeSpec) -> Result<Self, String> {
|
|
// Extract the options from the spec file that are needed to connect to
|
|
// the storage system.
|
|
//
|
|
// For backwards-compatibility, the top-level fields in the spec file
|
|
// may be empty. In that case, we need to dig them from the GUCs in the
|
|
// cluster.settings field.
|
|
let pageserver_connstr = spec
|
|
.pageserver_connstring
|
|
.clone()
|
|
.or_else(|| spec.cluster.settings.find("neon.pageserver_connstring"))
|
|
.ok_or("pageserver connstr should be provided")?;
|
|
let safekeeper_connstrings = if spec.safekeeper_connstrings.is_empty() {
|
|
if matches!(spec.mode, ComputeMode::Primary) {
|
|
spec.cluster
|
|
.settings
|
|
.find("neon.safekeepers")
|
|
.ok_or("safekeeper connstrings should be provided")?
|
|
.split(',')
|
|
.map(|str| str.to_string())
|
|
.collect()
|
|
} else {
|
|
vec![]
|
|
}
|
|
} else {
|
|
spec.safekeeper_connstrings.clone()
|
|
};
|
|
let storage_auth_token = spec.storage_auth_token.clone();
|
|
let tenant_id: TenantId = if let Some(tenant_id) = spec.tenant_id {
|
|
tenant_id
|
|
} else {
|
|
spec.cluster
|
|
.settings
|
|
.find("neon.tenant_id")
|
|
.ok_or("tenant id should be provided")
|
|
.map(|s| TenantId::from_str(&s))?
|
|
.or(Err("invalid tenant id"))?
|
|
};
|
|
let timeline_id: TimelineId = if let Some(timeline_id) = spec.timeline_id {
|
|
timeline_id
|
|
} else {
|
|
spec.cluster
|
|
.settings
|
|
.find("neon.timeline_id")
|
|
.ok_or("timeline id should be provided")
|
|
.map(|s| TimelineId::from_str(&s))?
|
|
.or(Err("invalid timeline id"))?
|
|
};
|
|
|
|
Ok(ParsedSpec {
|
|
spec,
|
|
pageserver_connstr,
|
|
safekeeper_connstrings,
|
|
storage_auth_token,
|
|
tenant_id,
|
|
timeline_id,
|
|
})
|
|
}
|
|
}
|
|
|
|
/// If we are a VM, returns a [`Command`] that will run in the `neon-postgres`
|
|
/// cgroup. Otherwise returns the default `Command::new(cmd)`
|
|
///
|
|
/// This function should be used to start postgres, as it will start it in the
|
|
/// neon-postgres cgroup if we are a VM. This allows autoscaling to control
|
|
/// postgres' resource usage. The cgroup will exist in VMs because vm-builder
|
|
/// creates it during the sysinit phase of its inittab.
|
|
fn maybe_cgexec(cmd: &str) -> Command {
|
|
// The cplane sets this env var for autoscaling computes.
|
|
// use `var_os` so we don't have to worry about the variable being valid
|
|
// unicode. Should never be an concern . . . but just in case
|
|
if env::var_os("AUTOSCALING").is_some() {
|
|
let mut command = Command::new("cgexec");
|
|
command.args(["-g", "memory:neon-postgres"]);
|
|
command.arg(cmd);
|
|
command
|
|
} else {
|
|
Command::new(cmd)
|
|
}
|
|
}
|
|
|
|
/// Create special neon_superuser role, that's a slightly nerfed version of a real superuser
|
|
/// that we give to customers
|
|
fn create_neon_superuser(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
|
|
let roles = spec
|
|
.cluster
|
|
.roles
|
|
.iter()
|
|
.map(|r| escape_literal(&r.name))
|
|
.collect::<Vec<_>>();
|
|
|
|
let dbs = spec
|
|
.cluster
|
|
.databases
|
|
.iter()
|
|
.map(|db| escape_literal(&db.name))
|
|
.collect::<Vec<_>>();
|
|
|
|
let roles_decl = if roles.is_empty() {
|
|
String::from("roles text[] := NULL;")
|
|
} else {
|
|
format!(
|
|
r#"
|
|
roles text[] := ARRAY(SELECT rolname
|
|
FROM pg_catalog.pg_roles
|
|
WHERE rolname IN ({}));"#,
|
|
roles.join(", ")
|
|
)
|
|
};
|
|
|
|
let database_decl = if dbs.is_empty() {
|
|
String::from("dbs text[] := NULL;")
|
|
} else {
|
|
format!(
|
|
r#"
|
|
dbs text[] := ARRAY(SELECT datname
|
|
FROM pg_catalog.pg_database
|
|
WHERE datname IN ({}));"#,
|
|
dbs.join(", ")
|
|
)
|
|
};
|
|
|
|
// ALL PRIVILEGES grants CREATE, CONNECT, and TEMPORARY on all databases
|
|
// (see https://www.postgresql.org/docs/current/ddl-priv.html)
|
|
let query = format!(
|
|
r#"
|
|
DO $$
|
|
DECLARE
|
|
r text;
|
|
{}
|
|
{}
|
|
BEGIN
|
|
IF NOT EXISTS (
|
|
SELECT FROM pg_catalog.pg_roles WHERE rolname = 'neon_superuser')
|
|
THEN
|
|
CREATE ROLE neon_superuser CREATEDB CREATEROLE NOLOGIN IN ROLE pg_read_all_data, pg_write_all_data;
|
|
IF array_length(roles, 1) IS NOT NULL THEN
|
|
EXECUTE format('GRANT neon_superuser TO %s',
|
|
array_to_string(ARRAY(SELECT quote_ident(x) FROM unnest(roles) as x), ', '));
|
|
FOREACH r IN ARRAY roles LOOP
|
|
EXECUTE format('ALTER ROLE %s CREATEROLE CREATEDB', quote_ident(r));
|
|
END LOOP;
|
|
END IF;
|
|
IF array_length(dbs, 1) IS NOT NULL THEN
|
|
EXECUTE format('GRANT ALL PRIVILEGES ON DATABASE %s TO neon_superuser',
|
|
array_to_string(ARRAY(SELECT quote_ident(x) FROM unnest(dbs) as x), ', '));
|
|
END IF;
|
|
END IF;
|
|
END
|
|
$$;"#,
|
|
roles_decl, database_decl,
|
|
);
|
|
info!("Neon superuser created:\n{}", &query);
|
|
client
|
|
.simple_query(&query)
|
|
.map_err(|e| anyhow::anyhow!(e).context(query))?;
|
|
Ok(())
|
|
}
|
|
|
|
impl ComputeNode {
|
|
pub fn set_status(&self, status: ComputeStatus) {
|
|
let mut state = self.state.lock().unwrap();
|
|
state.status = status;
|
|
self.state_changed.notify_all();
|
|
}
|
|
|
|
pub fn get_status(&self) -> ComputeStatus {
|
|
self.state.lock().unwrap().status
|
|
}
|
|
|
|
// Remove `pgdata` directory and create it again with right permissions.
|
|
fn create_pgdata(&self) -> Result<()> {
|
|
// Ignore removal error, likely it is a 'No such file or directory (os error 2)'.
|
|
// If it is something different then create_dir() will error out anyway.
|
|
let _ok = fs::remove_dir_all(&self.pgdata);
|
|
fs::create_dir(&self.pgdata)?;
|
|
fs::set_permissions(&self.pgdata, fs::Permissions::from_mode(0o700))?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
// Get basebackup from the libpq connection to pageserver using `connstr` and
|
|
// unarchive it to `pgdata` directory overriding all its previous content.
|
|
#[instrument(skip_all, fields(%lsn))]
|
|
fn get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
|
|
let spec = compute_state.pspec.as_ref().expect("spec must be set");
|
|
let start_time = Instant::now();
|
|
|
|
let mut config = postgres::Config::from_str(&spec.pageserver_connstr)?;
|
|
|
|
// Use the storage auth token from the config file, if given.
|
|
// Note: this overrides any password set in the connection string.
|
|
if let Some(storage_auth_token) = &spec.storage_auth_token {
|
|
info!("Got storage auth token from spec file");
|
|
config.password(storage_auth_token);
|
|
} else {
|
|
info!("Storage auth token not set");
|
|
}
|
|
|
|
// Connect to pageserver
|
|
let mut client = config.connect(NoTls)?;
|
|
let pageserver_connect_micros = start_time.elapsed().as_micros() as u64;
|
|
|
|
let basebackup_cmd = match lsn {
|
|
// HACK We don't use compression on first start (Lsn(0)) because there's no API for it
|
|
Lsn(0) => format!("basebackup {} {}", spec.tenant_id, spec.timeline_id),
|
|
_ => format!(
|
|
"basebackup {} {} {} --gzip",
|
|
spec.tenant_id, spec.timeline_id, lsn
|
|
),
|
|
};
|
|
|
|
let copyreader = client.copy_out(basebackup_cmd.as_str())?;
|
|
let mut measured_reader = MeasuredReader::new(copyreader);
|
|
|
|
// Check the magic number to see if it's a gzip or not. Even though
|
|
// we might explicitly ask for gzip, an old pageserver with no implementation
|
|
// of gzip compression might send us uncompressed data. After some time
|
|
// passes we can assume all pageservers know how to compress and we can
|
|
// delete this check.
|
|
//
|
|
// If the data is not gzip, it will be tar. It will not be mistakenly
|
|
// recognized as gzip because tar starts with an ascii encoding of a filename,
|
|
// and 0x1f and 0x8b are unlikely first characters for any filename. Moreover,
|
|
// we send the "global" directory first from the pageserver, so it definitely
|
|
// won't be recognized as gzip.
|
|
let mut bufreader = std::io::BufReader::new(&mut measured_reader);
|
|
let gzip = {
|
|
let peek = bufreader.fill_buf().unwrap();
|
|
peek[0] == 0x1f && peek[1] == 0x8b
|
|
};
|
|
|
|
// Read the archive directly from the `CopyOutReader`
|
|
//
|
|
// Set `ignore_zeros` so that unpack() reads all the Copy data and
|
|
// doesn't stop at the end-of-archive marker. Otherwise, if the server
|
|
// sends an Error after finishing the tarball, we will not notice it.
|
|
if gzip {
|
|
let mut ar = tar::Archive::new(flate2::read::GzDecoder::new(&mut bufreader));
|
|
ar.set_ignore_zeros(true);
|
|
ar.unpack(&self.pgdata)?;
|
|
} else {
|
|
let mut ar = tar::Archive::new(&mut bufreader);
|
|
ar.set_ignore_zeros(true);
|
|
ar.unpack(&self.pgdata)?;
|
|
};
|
|
|
|
// Report metrics
|
|
let mut state = self.state.lock().unwrap();
|
|
state.metrics.pageserver_connect_micros = pageserver_connect_micros;
|
|
state.metrics.basebackup_bytes = measured_reader.get_byte_count() as u64;
|
|
state.metrics.basebackup_ms = start_time.elapsed().as_millis() as u64;
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn check_safekeepers_synced_async(
|
|
&self,
|
|
compute_state: &ComputeState,
|
|
) -> Result<Option<Lsn>> {
|
|
// Construct a connection config for each safekeeper
|
|
let pspec: ParsedSpec = compute_state
|
|
.pspec
|
|
.as_ref()
|
|
.expect("spec must be set")
|
|
.clone();
|
|
let sk_connstrs: Vec<String> = pspec.safekeeper_connstrings.clone();
|
|
let sk_configs = sk_connstrs.into_iter().map(|connstr| {
|
|
// Format connstr
|
|
let id = connstr.clone();
|
|
let connstr = format!("postgresql://no_user@{}", connstr);
|
|
let options = format!(
|
|
"-c timeline_id={} tenant_id={}",
|
|
pspec.timeline_id, pspec.tenant_id
|
|
);
|
|
|
|
// Construct client
|
|
let mut config = tokio_postgres::Config::from_str(&connstr).unwrap();
|
|
config.options(&options);
|
|
if let Some(storage_auth_token) = pspec.storage_auth_token.clone() {
|
|
config.password(storage_auth_token);
|
|
}
|
|
|
|
(id, config)
|
|
});
|
|
|
|
// Create task set to query all safekeepers
|
|
let mut tasks = FuturesUnordered::new();
|
|
let quorum = sk_configs.len() / 2 + 1;
|
|
for (id, config) in sk_configs {
|
|
let timeout = tokio::time::Duration::from_millis(100);
|
|
let task = tokio::time::timeout(timeout, ping_safekeeper(id, config));
|
|
tasks.push(tokio::spawn(task));
|
|
}
|
|
|
|
// Get a quorum of responses or errors
|
|
let mut responses = Vec::new();
|
|
let mut join_errors = Vec::new();
|
|
let mut task_errors = Vec::new();
|
|
let mut timeout_errors = Vec::new();
|
|
while let Some(response) = tasks.next().await {
|
|
match response {
|
|
Ok(Ok(Ok(r))) => responses.push(r),
|
|
Ok(Ok(Err(e))) => task_errors.push(e),
|
|
Ok(Err(e)) => timeout_errors.push(e),
|
|
Err(e) => join_errors.push(e),
|
|
};
|
|
if responses.len() >= quorum {
|
|
break;
|
|
}
|
|
if join_errors.len() + task_errors.len() + timeout_errors.len() >= quorum {
|
|
break;
|
|
}
|
|
}
|
|
|
|
// In case of error, log and fail the check, but don't crash.
|
|
// We're playing it safe because these errors could be transient
|
|
// and we don't yet retry. Also being careful here allows us to
|
|
// be backwards compatible with safekeepers that don't have the
|
|
// TIMELINE_STATUS API yet.
|
|
if responses.len() < quorum {
|
|
error!(
|
|
"failed sync safekeepers check {:?} {:?} {:?}",
|
|
join_errors, task_errors, timeout_errors
|
|
);
|
|
return Ok(None);
|
|
}
|
|
|
|
Ok(check_if_synced(responses))
|
|
}
|
|
|
|
// Fast path for sync_safekeepers. If they're already synced we get the lsn
|
|
// in one roundtrip. If not, we should do a full sync_safekeepers.
|
|
pub fn check_safekeepers_synced(&self, compute_state: &ComputeState) -> Result<Option<Lsn>> {
|
|
let start_time = Utc::now();
|
|
|
|
// Run actual work with new tokio runtime
|
|
let rt = tokio::runtime::Builder::new_current_thread()
|
|
.enable_all()
|
|
.build()
|
|
.expect("failed to create rt");
|
|
let result = rt.block_on(self.check_safekeepers_synced_async(compute_state));
|
|
|
|
// Record runtime
|
|
self.state.lock().unwrap().metrics.sync_sk_check_ms = Utc::now()
|
|
.signed_duration_since(start_time)
|
|
.to_std()
|
|
.unwrap()
|
|
.as_millis() as u64;
|
|
result
|
|
}
|
|
|
|
// Run `postgres` in a special mode with `--sync-safekeepers` argument
|
|
// and return the reported LSN back to the caller.
|
|
#[instrument(skip_all)]
|
|
pub fn sync_safekeepers(&self, storage_auth_token: Option<String>) -> Result<Lsn> {
|
|
let start_time = Utc::now();
|
|
|
|
let sync_handle = maybe_cgexec(&self.pgbin)
|
|
.args(["--sync-safekeepers"])
|
|
.env("PGDATA", &self.pgdata) // we cannot use -D in this mode
|
|
.envs(if let Some(storage_auth_token) = &storage_auth_token {
|
|
vec![("NEON_AUTH_TOKEN", storage_auth_token)]
|
|
} else {
|
|
vec![]
|
|
})
|
|
.stdout(Stdio::piped())
|
|
.spawn()
|
|
.expect("postgres --sync-safekeepers failed to start");
|
|
|
|
// `postgres --sync-safekeepers` will print all log output to stderr and
|
|
// final LSN to stdout. So we pipe only stdout, while stderr will be automatically
|
|
// redirected to the caller output.
|
|
let sync_output = sync_handle
|
|
.wait_with_output()
|
|
.expect("postgres --sync-safekeepers failed");
|
|
|
|
if !sync_output.status.success() {
|
|
anyhow::bail!(
|
|
"postgres --sync-safekeepers exited with non-zero status: {}. stdout: {}",
|
|
sync_output.status,
|
|
String::from_utf8(sync_output.stdout)
|
|
.expect("postgres --sync-safekeepers exited, and stdout is not utf-8"),
|
|
);
|
|
}
|
|
|
|
self.state.lock().unwrap().metrics.sync_safekeepers_ms = Utc::now()
|
|
.signed_duration_since(start_time)
|
|
.to_std()
|
|
.unwrap()
|
|
.as_millis() as u64;
|
|
|
|
let lsn = Lsn::from_str(String::from_utf8(sync_output.stdout)?.trim())?;
|
|
|
|
Ok(lsn)
|
|
}
|
|
|
|
/// Do all the preparations like PGDATA directory creation, configuration,
|
|
/// safekeepers sync, basebackup, etc.
|
|
#[instrument(skip_all)]
|
|
pub fn prepare_pgdata(
|
|
&self,
|
|
compute_state: &ComputeState,
|
|
extension_server_port: u16,
|
|
) -> Result<()> {
|
|
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
|
let spec = &pspec.spec;
|
|
let pgdata_path = Path::new(&self.pgdata);
|
|
|
|
// Remove/create an empty pgdata directory and put configuration there.
|
|
self.create_pgdata()?;
|
|
config::write_postgres_conf(
|
|
&pgdata_path.join("postgresql.conf"),
|
|
&pspec.spec,
|
|
Some(extension_server_port),
|
|
)?;
|
|
|
|
// Syncing safekeepers is only safe with primary nodes: if a primary
|
|
// is already connected it will be kicked out, so a secondary (standby)
|
|
// cannot sync safekeepers.
|
|
let lsn = match spec.mode {
|
|
ComputeMode::Primary => {
|
|
info!("checking if safekeepers are synced");
|
|
let lsn = if let Ok(Some(lsn)) = self.check_safekeepers_synced(compute_state) {
|
|
lsn
|
|
} else {
|
|
info!("starting safekeepers syncing");
|
|
self.sync_safekeepers(pspec.storage_auth_token.clone())
|
|
.with_context(|| "failed to sync safekeepers")?
|
|
};
|
|
info!("safekeepers synced at LSN {}", lsn);
|
|
lsn
|
|
}
|
|
ComputeMode::Static(lsn) => {
|
|
info!("Starting read-only node at static LSN {}", lsn);
|
|
lsn
|
|
}
|
|
ComputeMode::Replica => {
|
|
info!("Initializing standby from latest Pageserver LSN");
|
|
Lsn(0)
|
|
}
|
|
};
|
|
|
|
info!(
|
|
"getting basebackup@{} from pageserver {}",
|
|
lsn, &pspec.pageserver_connstr
|
|
);
|
|
self.get_basebackup(compute_state, lsn).with_context(|| {
|
|
format!(
|
|
"failed to get basebackup@{} from pageserver {}",
|
|
lsn, &pspec.pageserver_connstr
|
|
)
|
|
})?;
|
|
|
|
// Update pg_hba.conf received with basebackup.
|
|
update_pg_hba(pgdata_path)?;
|
|
|
|
match spec.mode {
|
|
ComputeMode::Primary => {}
|
|
ComputeMode::Replica | ComputeMode::Static(..) => {
|
|
add_standby_signal(pgdata_path)?;
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Start and stop a postgres process to warm up the VM for startup.
|
|
pub fn prewarm_postgres(&self) -> Result<()> {
|
|
info!("prewarming");
|
|
|
|
// Create pgdata
|
|
let pgdata = &format!("{}.warmup", self.pgdata);
|
|
create_pgdata(pgdata)?;
|
|
|
|
// Run initdb to completion
|
|
info!("running initdb");
|
|
let initdb_bin = Path::new(&self.pgbin).parent().unwrap().join("initdb");
|
|
Command::new(initdb_bin)
|
|
.args(["-D", pgdata])
|
|
.output()
|
|
.expect("cannot start initdb process");
|
|
|
|
// Write conf
|
|
use std::io::Write;
|
|
let conf_path = Path::new(pgdata).join("postgresql.conf");
|
|
let mut file = std::fs::File::create(conf_path)?;
|
|
writeln!(file, "shared_buffers=65536")?;
|
|
writeln!(file, "port=51055")?; // Nobody should be connecting
|
|
writeln!(file, "shared_preload_libraries = 'neon'")?;
|
|
|
|
// Start postgres
|
|
info!("starting postgres");
|
|
let mut pg = maybe_cgexec(&self.pgbin)
|
|
.args(["-D", pgdata])
|
|
.spawn()
|
|
.expect("cannot start postgres process");
|
|
|
|
// Stop it when it's ready
|
|
info!("waiting for postgres");
|
|
wait_for_postgres(&mut pg, Path::new(pgdata))?;
|
|
pg.kill()?;
|
|
info!("sent kill signal");
|
|
pg.wait()?;
|
|
info!("done prewarming");
|
|
|
|
// clean up
|
|
let _ok = fs::remove_dir_all(pgdata);
|
|
Ok(())
|
|
}
|
|
|
|
/// Start Postgres as a child process and manage DBs/roles.
|
|
/// After that this will hang waiting on the postmaster process to exit.
|
|
#[instrument(skip_all)]
|
|
pub fn start_postgres(
|
|
&self,
|
|
storage_auth_token: Option<String>,
|
|
) -> Result<std::process::Child> {
|
|
let pgdata_path = Path::new(&self.pgdata);
|
|
|
|
// Run postgres as a child process.
|
|
let mut pg = maybe_cgexec(&self.pgbin)
|
|
.args(["-D", &self.pgdata])
|
|
.envs(if let Some(storage_auth_token) = &storage_auth_token {
|
|
vec![("NEON_AUTH_TOKEN", storage_auth_token)]
|
|
} else {
|
|
vec![]
|
|
})
|
|
.spawn()
|
|
.expect("cannot start postgres process");
|
|
|
|
wait_for_postgres(&mut pg, pgdata_path)?;
|
|
|
|
Ok(pg)
|
|
}
|
|
|
|
/// Do initial configuration of the already started Postgres.
|
|
#[instrument(skip_all)]
|
|
pub fn apply_config(&self, compute_state: &ComputeState) -> Result<()> {
|
|
// If connection fails,
|
|
// it may be the old node with `zenith_admin` superuser.
|
|
//
|
|
// In this case we need to connect with old `zenith_admin` name
|
|
// and create new user. We cannot simply rename connected user,
|
|
// but we can create a new one and grant it all privileges.
|
|
let mut client = match Client::connect(self.connstr.as_str(), NoTls) {
|
|
Err(e) => {
|
|
info!(
|
|
"cannot connect to postgres: {}, retrying with `zenith_admin` username",
|
|
e
|
|
);
|
|
let mut zenith_admin_connstr = self.connstr.clone();
|
|
|
|
zenith_admin_connstr
|
|
.set_username("zenith_admin")
|
|
.map_err(|_| anyhow::anyhow!("invalid connstr"))?;
|
|
|
|
let mut client = Client::connect(zenith_admin_connstr.as_str(), NoTls)?;
|
|
// Disable forwarding so that users don't get a cloud_admin role
|
|
client.simple_query("SET neon.forward_ddl = false")?;
|
|
client.simple_query("CREATE USER cloud_admin WITH SUPERUSER")?;
|
|
client.simple_query("GRANT zenith_admin TO cloud_admin")?;
|
|
drop(client);
|
|
|
|
// reconnect with connsting with expected name
|
|
Client::connect(self.connstr.as_str(), NoTls)?
|
|
}
|
|
Ok(client) => client,
|
|
};
|
|
|
|
// Disable DDL forwarding because control plane already knows about these roles/databases.
|
|
client.simple_query("SET neon.forward_ddl = false")?;
|
|
|
|
// Proceed with post-startup configuration. Note, that order of operations is important.
|
|
let spec = &compute_state.pspec.as_ref().expect("spec must be set").spec;
|
|
create_neon_superuser(spec, &mut client)?;
|
|
handle_roles(spec, &mut client)?;
|
|
handle_databases(spec, &mut client)?;
|
|
handle_role_deletions(spec, self.connstr.as_str(), &mut client)?;
|
|
handle_grants(spec, self.connstr.as_str())?;
|
|
handle_extensions(spec, &mut client)?;
|
|
create_availability_check_data(&mut client)?;
|
|
|
|
// 'Close' connection
|
|
drop(client);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
// We could've wrapped this around `pg_ctl reload`, but right now we don't use
|
|
// `pg_ctl` for start / stop, so this just seems much easier to do as we already
|
|
// have opened connection to Postgres and superuser access.
|
|
#[instrument(skip_all)]
|
|
fn pg_reload_conf(&self, client: &mut Client) -> Result<()> {
|
|
client.simple_query("SELECT pg_reload_conf()")?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Similar to `apply_config()`, but does a bit different sequence of operations,
|
|
/// as it's used to reconfigure a previously started and configured Postgres node.
|
|
#[instrument(skip_all)]
|
|
pub fn reconfigure(&self) -> Result<()> {
|
|
let spec = self.state.lock().unwrap().pspec.clone().unwrap().spec;
|
|
|
|
// Write new config
|
|
let pgdata_path = Path::new(&self.pgdata);
|
|
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec, None)?;
|
|
|
|
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
|
|
self.pg_reload_conf(&mut client)?;
|
|
|
|
// Proceed with post-startup configuration. Note, that order of operations is important.
|
|
// Disable DDL forwarding because control plane already knows about these roles/databases.
|
|
if spec.mode == ComputeMode::Primary {
|
|
client.simple_query("SET neon.forward_ddl = false")?;
|
|
handle_roles(&spec, &mut client)?;
|
|
handle_databases(&spec, &mut client)?;
|
|
handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
|
|
handle_grants(&spec, self.connstr.as_str())?;
|
|
handle_extensions(&spec, &mut client)?;
|
|
}
|
|
|
|
// 'Close' connection
|
|
drop(client);
|
|
|
|
let unknown_op = "unknown".to_string();
|
|
let op_id = spec.operation_uuid.as_ref().unwrap_or(&unknown_op);
|
|
info!(
|
|
"finished reconfiguration of compute node for operation {}",
|
|
op_id
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[instrument(skip_all)]
|
|
pub fn start_compute(&self, extension_server_port: u16) -> Result<std::process::Child> {
|
|
let compute_state = self.state.lock().unwrap().clone();
|
|
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
|
info!(
|
|
"starting compute for project {}, operation {}, tenant {}, timeline {}",
|
|
pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None"),
|
|
pspec.spec.operation_uuid.as_deref().unwrap_or("None"),
|
|
pspec.tenant_id,
|
|
pspec.timeline_id,
|
|
);
|
|
|
|
info!(
|
|
"start_compute spec.remote_extensions {:?}",
|
|
pspec.spec.remote_extensions
|
|
);
|
|
|
|
// This part is sync, because we need to download
|
|
// remote shared_preload_libraries before postgres start (if any)
|
|
if let Some(remote_extensions) = &pspec.spec.remote_extensions {
|
|
// First, create control files for all availale extensions
|
|
extension_server::create_control_files(remote_extensions, &self.pgbin);
|
|
|
|
let library_load_start_time = Utc::now();
|
|
let remote_ext_metrics = self.prepare_preload_libraries(&pspec.spec)?;
|
|
|
|
let library_load_time = Utc::now()
|
|
.signed_duration_since(library_load_start_time)
|
|
.to_std()
|
|
.unwrap()
|
|
.as_millis() as u64;
|
|
let mut state = self.state.lock().unwrap();
|
|
state.metrics.load_ext_ms = library_load_time;
|
|
state.metrics.num_ext_downloaded = remote_ext_metrics.num_ext_downloaded;
|
|
state.metrics.largest_ext_size = remote_ext_metrics.largest_ext_size;
|
|
state.metrics.total_ext_download_size = remote_ext_metrics.total_ext_download_size;
|
|
info!(
|
|
"Loading shared_preload_libraries took {:?}ms",
|
|
library_load_time
|
|
);
|
|
info!("{:?}", remote_ext_metrics);
|
|
}
|
|
|
|
self.prepare_pgdata(&compute_state, extension_server_port)?;
|
|
|
|
let start_time = Utc::now();
|
|
let pg = self.start_postgres(pspec.storage_auth_token.clone())?;
|
|
|
|
let config_time = Utc::now();
|
|
if pspec.spec.mode == ComputeMode::Primary && !pspec.spec.skip_pg_catalog_updates {
|
|
self.apply_config(&compute_state)?;
|
|
}
|
|
|
|
let startup_end_time = Utc::now();
|
|
{
|
|
let mut state = self.state.lock().unwrap();
|
|
state.metrics.start_postgres_ms = config_time
|
|
.signed_duration_since(start_time)
|
|
.to_std()
|
|
.unwrap()
|
|
.as_millis() as u64;
|
|
state.metrics.config_ms = startup_end_time
|
|
.signed_duration_since(config_time)
|
|
.to_std()
|
|
.unwrap()
|
|
.as_millis() as u64;
|
|
state.metrics.total_startup_ms = startup_end_time
|
|
.signed_duration_since(compute_state.start_time)
|
|
.to_std()
|
|
.unwrap()
|
|
.as_millis() as u64;
|
|
}
|
|
self.set_status(ComputeStatus::Running);
|
|
|
|
info!(
|
|
"finished configuration of compute for project {}",
|
|
pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None")
|
|
);
|
|
|
|
// Log metrics so that we can search for slow operations in logs
|
|
let metrics = {
|
|
let state = self.state.lock().unwrap();
|
|
state.metrics.clone()
|
|
};
|
|
info!(?metrics, "compute start finished");
|
|
|
|
Ok(pg)
|
|
}
|
|
|
|
// Look for core dumps and collect backtraces.
|
|
//
|
|
// EKS worker nodes have following core dump settings:
|
|
// /proc/sys/kernel/core_pattern -> core
|
|
// /proc/sys/kernel/core_uses_pid -> 1
|
|
// ulimint -c -> unlimited
|
|
// which results in core dumps being written to postgres data directory as core.<pid>.
|
|
//
|
|
// Use that as a default location and pattern, except macos where core dumps are written
|
|
// to /cores/ directory by default.
|
|
pub fn check_for_core_dumps(&self) -> Result<()> {
|
|
let core_dump_dir = match std::env::consts::OS {
|
|
"macos" => Path::new("/cores/"),
|
|
_ => Path::new(&self.pgdata),
|
|
};
|
|
|
|
// Collect core dump paths if any
|
|
info!("checking for core dumps in {}", core_dump_dir.display());
|
|
let files = fs::read_dir(core_dump_dir)?;
|
|
let cores = files.filter_map(|entry| {
|
|
let entry = entry.ok()?;
|
|
let _ = entry.file_name().to_str()?.strip_prefix("core.")?;
|
|
Some(entry.path())
|
|
});
|
|
|
|
// Print backtrace for each core dump
|
|
for core_path in cores {
|
|
warn!(
|
|
"core dump found: {}, collecting backtrace",
|
|
core_path.display()
|
|
);
|
|
|
|
// Try first with gdb
|
|
let backtrace = Command::new("gdb")
|
|
.args(["--batch", "-q", "-ex", "bt", &self.pgbin])
|
|
.arg(&core_path)
|
|
.output();
|
|
|
|
// Try lldb if no gdb is found -- that is handy for local testing on macOS
|
|
let backtrace = match backtrace {
|
|
Err(ref e) if e.kind() == std::io::ErrorKind::NotFound => {
|
|
warn!("cannot find gdb, trying lldb");
|
|
Command::new("lldb")
|
|
.arg("-c")
|
|
.arg(&core_path)
|
|
.args(["--batch", "-o", "bt all", "-o", "quit"])
|
|
.output()
|
|
}
|
|
_ => backtrace,
|
|
}?;
|
|
|
|
warn!(
|
|
"core dump backtrace: {}",
|
|
String::from_utf8_lossy(&backtrace.stdout)
|
|
);
|
|
warn!(
|
|
"debugger stderr: {}",
|
|
String::from_utf8_lossy(&backtrace.stderr)
|
|
);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Select `pg_stat_statements` data and return it as a stringified JSON
|
|
pub async fn collect_insights(&self) -> String {
|
|
let mut result_rows: Vec<String> = Vec::new();
|
|
let connect_result = tokio_postgres::connect(self.connstr.as_str(), NoTls).await;
|
|
let (client, connection) = connect_result.unwrap();
|
|
tokio::spawn(async move {
|
|
if let Err(e) = connection.await {
|
|
eprintln!("connection error: {}", e);
|
|
}
|
|
});
|
|
let result = client
|
|
.simple_query(
|
|
"SELECT
|
|
row_to_json(pg_stat_statements)
|
|
FROM
|
|
pg_stat_statements
|
|
WHERE
|
|
userid != 'cloud_admin'::regrole::oid
|
|
ORDER BY
|
|
(mean_exec_time + mean_plan_time) DESC
|
|
LIMIT 100",
|
|
)
|
|
.await;
|
|
|
|
if let Ok(raw_rows) = result {
|
|
for message in raw_rows.iter() {
|
|
if let postgres::SimpleQueryMessage::Row(row) = message {
|
|
if let Some(json) = row.get(0) {
|
|
result_rows.push(json.to_string());
|
|
}
|
|
}
|
|
}
|
|
|
|
format!("{{\"pg_stat_statements\": [{}]}}", result_rows.join(","))
|
|
} else {
|
|
"{{\"pg_stat_statements\": []}}".to_string()
|
|
}
|
|
}
|
|
|
|
// download an archive, unzip and place files in correct locations
|
|
pub async fn download_extension(
|
|
&self,
|
|
real_ext_name: String,
|
|
ext_path: RemotePath,
|
|
) -> Result<u64, DownloadError> {
|
|
let remote_storage = self
|
|
.ext_remote_storage
|
|
.as_ref()
|
|
.ok_or(DownloadError::BadInput(anyhow::anyhow!(
|
|
"Remote extensions storage is not configured",
|
|
)))?;
|
|
|
|
let ext_archive_name = ext_path.object_name().expect("bad path");
|
|
|
|
let mut first_try = false;
|
|
if !self
|
|
.ext_download_progress
|
|
.read()
|
|
.expect("lock err")
|
|
.contains_key(ext_archive_name)
|
|
{
|
|
self.ext_download_progress
|
|
.write()
|
|
.expect("lock err")
|
|
.insert(ext_archive_name.to_string(), (Utc::now(), false));
|
|
first_try = true;
|
|
}
|
|
let (download_start, download_completed) =
|
|
self.ext_download_progress.read().expect("lock err")[ext_archive_name];
|
|
let start_time_delta = Utc::now()
|
|
.signed_duration_since(download_start)
|
|
.to_std()
|
|
.unwrap()
|
|
.as_millis() as u64;
|
|
|
|
// how long to wait for extension download if it was started by another process
|
|
const HANG_TIMEOUT: u64 = 3000; // milliseconds
|
|
|
|
if download_completed {
|
|
info!("extension already downloaded, skipping re-download");
|
|
return Ok(0);
|
|
} else if start_time_delta < HANG_TIMEOUT && !first_try {
|
|
info!("download {ext_archive_name} already started by another process, hanging untill completion or timeout");
|
|
let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(500));
|
|
loop {
|
|
info!("waiting for download");
|
|
interval.tick().await;
|
|
let (_, download_completed_now) =
|
|
self.ext_download_progress.read().expect("lock")[ext_archive_name];
|
|
if download_completed_now {
|
|
info!("download finished by whoever else downloaded it");
|
|
return Ok(0);
|
|
}
|
|
}
|
|
// NOTE: the above loop will get terminated
|
|
// based on the timeout of the download function
|
|
}
|
|
|
|
// if extension hasn't been downloaded before or the previous
|
|
// attempt to download was at least HANG_TIMEOUT ms ago
|
|
// then we try to download it here
|
|
info!("downloading new extension {ext_archive_name}");
|
|
|
|
let download_size = extension_server::download_extension(
|
|
&real_ext_name,
|
|
&ext_path,
|
|
remote_storage,
|
|
&self.pgbin,
|
|
)
|
|
.await
|
|
.map_err(DownloadError::Other);
|
|
|
|
self.ext_download_progress
|
|
.write()
|
|
.expect("bad lock")
|
|
.insert(ext_archive_name.to_string(), (download_start, true));
|
|
|
|
download_size
|
|
}
|
|
|
|
#[tokio::main]
|
|
pub async fn prepare_preload_libraries(
|
|
&self,
|
|
spec: &ComputeSpec,
|
|
) -> Result<RemoteExtensionMetrics> {
|
|
if self.ext_remote_storage.is_none() {
|
|
return Ok(RemoteExtensionMetrics {
|
|
num_ext_downloaded: 0,
|
|
largest_ext_size: 0,
|
|
total_ext_download_size: 0,
|
|
});
|
|
}
|
|
let remote_extensions = spec
|
|
.remote_extensions
|
|
.as_ref()
|
|
.ok_or(anyhow::anyhow!("Remote extensions are not configured",))?;
|
|
|
|
info!("parse shared_preload_libraries from spec.cluster.settings");
|
|
let mut libs_vec = Vec::new();
|
|
if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
|
|
libs_vec = libs
|
|
.split(&[',', '\'', ' '])
|
|
.filter(|s| *s != "neon" && !s.is_empty())
|
|
.map(str::to_string)
|
|
.collect();
|
|
}
|
|
info!("parse shared_preload_libraries from provided postgresql.conf");
|
|
|
|
// that is used in neon_local and python tests
|
|
if let Some(conf) = &spec.cluster.postgresql_conf {
|
|
let conf_lines = conf.split('\n').collect::<Vec<&str>>();
|
|
let mut shared_preload_libraries_line = "";
|
|
for line in conf_lines {
|
|
if line.starts_with("shared_preload_libraries") {
|
|
shared_preload_libraries_line = line;
|
|
}
|
|
}
|
|
let mut preload_libs_vec = Vec::new();
|
|
if let Some(libs) = shared_preload_libraries_line.split("='").nth(1) {
|
|
preload_libs_vec = libs
|
|
.split(&[',', '\'', ' '])
|
|
.filter(|s| *s != "neon" && !s.is_empty())
|
|
.map(str::to_string)
|
|
.collect();
|
|
}
|
|
libs_vec.extend(preload_libs_vec);
|
|
}
|
|
|
|
// Don't try to download libraries that are not in the index.
|
|
// Assume that they are already present locally.
|
|
libs_vec.retain(|lib| remote_extensions.library_index.contains_key(lib));
|
|
|
|
info!("Downloading to shared preload libraries: {:?}", &libs_vec);
|
|
|
|
let mut download_tasks = Vec::new();
|
|
for library in &libs_vec {
|
|
let (ext_name, ext_path) =
|
|
remote_extensions.get_ext(library, true, &self.build_tag, &self.pgversion)?;
|
|
download_tasks.push(self.download_extension(ext_name, ext_path));
|
|
}
|
|
let results = join_all(download_tasks).await;
|
|
|
|
let mut remote_ext_metrics = RemoteExtensionMetrics {
|
|
num_ext_downloaded: 0,
|
|
largest_ext_size: 0,
|
|
total_ext_download_size: 0,
|
|
};
|
|
for result in results {
|
|
let download_size = match result {
|
|
Ok(res) => {
|
|
remote_ext_metrics.num_ext_downloaded += 1;
|
|
res
|
|
}
|
|
Err(err) => {
|
|
// if we failed to download an extension, we don't want to fail the whole
|
|
// process, but we do want to log the error
|
|
error!("Failed to download extension: {}", err);
|
|
0
|
|
}
|
|
};
|
|
|
|
remote_ext_metrics.largest_ext_size =
|
|
std::cmp::max(remote_ext_metrics.largest_ext_size, download_size);
|
|
remote_ext_metrics.total_ext_download_size += download_size;
|
|
}
|
|
Ok(remote_ext_metrics)
|
|
}
|
|
}
|