mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-16 20:50:37 +00:00
Compare commits
4 Commits
release-pr
...
tristan957
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d7d3fb332f | ||
|
|
c37ce9b69c | ||
|
|
681edf3983 | ||
|
|
ab898e40b0 |
4
Cargo.lock
generated
4
Cargo.lock
generated
@@ -1278,10 +1278,13 @@ dependencies = [
|
||||
"chrono",
|
||||
"indexmap 2.9.0",
|
||||
"jsonwebtoken",
|
||||
"postgres",
|
||||
"regex",
|
||||
"remote_storage",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio-postgres",
|
||||
"url",
|
||||
"utils",
|
||||
]
|
||||
|
||||
@@ -6747,6 +6750,7 @@ dependencies = [
|
||||
"chrono",
|
||||
"clap",
|
||||
"clashmap",
|
||||
"compute_api",
|
||||
"control_plane",
|
||||
"cron",
|
||||
"diesel",
|
||||
|
||||
@@ -145,28 +145,47 @@ impl Cli {
|
||||
}
|
||||
}
|
||||
|
||||
fn main() -> Result<()> {
|
||||
impl Cli {
|
||||
pub fn get_config(&self) -> Result<ComputeConfig> {
|
||||
// First, read the config from the path if provided
|
||||
if let Some(ref config) = self.config {
|
||||
let file = File::open(config)?;
|
||||
return Ok(serde_json::from_reader(&file)?);
|
||||
}
|
||||
|
||||
// If the config wasn't provided in the CLI arguments, then retrieve it from
|
||||
// the control plane
|
||||
match get_config_from_control_plane(
|
||||
self.control_plane_uri.as_ref().unwrap(),
|
||||
&self.compute_id,
|
||||
) {
|
||||
Ok(config) => Ok(config),
|
||||
Err(e) => {
|
||||
error!(
|
||||
"cannot get response from control plane: {}\n\
|
||||
neither spec nor confirmation that compute is in the Empty state was received",
|
||||
e
|
||||
);
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
let cli = Cli::parse();
|
||||
|
||||
let scenario = failpoint_support::init();
|
||||
|
||||
// For historical reasons, the main thread that processes the config and launches postgres
|
||||
// is synchronous, but we always have this tokio runtime available and we "enter" it so
|
||||
// that you can use tokio::spawn() and tokio::runtime::Handle::current().block_on(...)
|
||||
// from all parts of compute_ctl.
|
||||
let runtime = tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.build()?;
|
||||
let _rt_guard = runtime.enter();
|
||||
|
||||
runtime.block_on(init())?;
|
||||
init().await?;
|
||||
|
||||
// enable core dumping for all child processes
|
||||
setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?;
|
||||
|
||||
let connstr = Url::parse(&cli.connstr).context("cannot parse connstr as a URL")?;
|
||||
|
||||
let config = get_config(&cli)?;
|
||||
let config = cli.get_config()?;
|
||||
|
||||
let compute_node = ComputeNode::new(
|
||||
ComputeNodeParams {
|
||||
@@ -191,7 +210,7 @@ fn main() -> Result<()> {
|
||||
config,
|
||||
)?;
|
||||
|
||||
let exit_code = compute_node.run()?;
|
||||
let exit_code = compute_node.run().await?;
|
||||
|
||||
scenario.teardown();
|
||||
|
||||
@@ -213,28 +232,6 @@ async fn init() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_config(cli: &Cli) -> Result<ComputeConfig> {
|
||||
// First, read the config from the path if provided
|
||||
if let Some(ref config) = cli.config {
|
||||
let file = File::open(config)?;
|
||||
return Ok(serde_json::from_reader(&file)?);
|
||||
}
|
||||
|
||||
// If the config wasn't provided in the CLI arguments, then retrieve it from
|
||||
// the control plane
|
||||
match get_config_from_control_plane(cli.control_plane_uri.as_ref().unwrap(), &cli.compute_id) {
|
||||
Ok(config) => Ok(config),
|
||||
Err(e) => {
|
||||
error!(
|
||||
"cannot get response from control plane: {}\n\
|
||||
neither spec nor confirmation that compute is in the Empty state was received",
|
||||
e
|
||||
);
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn deinit_and_exit(exit_code: Option<i32>) -> ! {
|
||||
// Shutdown trace pipeline gracefully, so that it has a chance to send any
|
||||
// pending traces before we exit. Shutting down OTEL tracing provider may
|
||||
|
||||
@@ -15,12 +15,8 @@ use itertools::Itertools;
|
||||
use nix::sys::signal::{Signal, kill};
|
||||
use nix::unistd::Pid;
|
||||
use once_cell::sync::Lazy;
|
||||
use postgres;
|
||||
use postgres::NoTls;
|
||||
use postgres::error::SqlState;
|
||||
use remote_storage::{DownloadError, RemotePath};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::net::SocketAddr;
|
||||
use std::os::unix::fs::{PermissionsExt, symlink};
|
||||
use std::path::Path;
|
||||
use std::process::{Command, Stdio};
|
||||
@@ -30,9 +26,9 @@ use std::sync::{Arc, Condvar, Mutex, RwLock};
|
||||
use std::time::{Duration, Instant};
|
||||
use std::{env, fs};
|
||||
use tokio::spawn;
|
||||
use tokio_postgres::{NoTls, error::SqlState};
|
||||
use tracing::{Instrument, debug, error, info, instrument, warn};
|
||||
use url::Url;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::measured_stream::MeasuredReader;
|
||||
|
||||
@@ -144,7 +140,7 @@ pub struct ComputeState {
|
||||
|
||||
/// Compute spec. This can be received from the CLI or - more likely -
|
||||
/// passed by the control plane with a /configure HTTP request.
|
||||
pub pspec: Option<ParsedSpec>,
|
||||
pub spec: Option<ComputeSpec>,
|
||||
|
||||
/// If the spec is passed by a /configure request, 'startup_span' is the
|
||||
/// /configure request's tracing span. The main thread enters it when it
|
||||
@@ -171,7 +167,7 @@ impl ComputeState {
|
||||
status: ComputeStatus::Empty,
|
||||
last_active: None,
|
||||
error: None,
|
||||
pspec: None,
|
||||
spec: None,
|
||||
startup_span: None,
|
||||
metrics: ComputeMetrics::default(),
|
||||
lfc_prewarm_state: LfcPrewarmState::default(),
|
||||
@@ -203,94 +199,6 @@ impl Default for ComputeState {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ParsedSpec {
|
||||
pub spec: ComputeSpec,
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
pub pageserver_connstr: String,
|
||||
pub safekeeper_connstrings: Vec<String>,
|
||||
pub storage_auth_token: Option<String>,
|
||||
pub endpoint_storage_addr: Option<SocketAddr>,
|
||||
pub endpoint_storage_token: Option<String>,
|
||||
}
|
||||
|
||||
impl TryFrom<ComputeSpec> for ParsedSpec {
|
||||
type Error = String;
|
||||
fn try_from(spec: ComputeSpec) -> Result<Self, String> {
|
||||
// Extract the options from the spec file that are needed to connect to
|
||||
// the storage system.
|
||||
//
|
||||
// For backwards-compatibility, the top-level fields in the spec file
|
||||
// may be empty. In that case, we need to dig them from the GUCs in the
|
||||
// cluster.settings field.
|
||||
let pageserver_connstr = spec
|
||||
.pageserver_connstring
|
||||
.clone()
|
||||
.or_else(|| spec.cluster.settings.find("neon.pageserver_connstring"))
|
||||
.ok_or("pageserver connstr should be provided")?;
|
||||
let safekeeper_connstrings = if spec.safekeeper_connstrings.is_empty() {
|
||||
if matches!(spec.mode, ComputeMode::Primary) {
|
||||
spec.cluster
|
||||
.settings
|
||||
.find("neon.safekeepers")
|
||||
.ok_or("safekeeper connstrings should be provided")?
|
||||
.split(',')
|
||||
.map(|str| str.to_string())
|
||||
.collect()
|
||||
} else {
|
||||
vec![]
|
||||
}
|
||||
} else {
|
||||
spec.safekeeper_connstrings.clone()
|
||||
};
|
||||
let storage_auth_token = spec.storage_auth_token.clone();
|
||||
let tenant_id: TenantId = if let Some(tenant_id) = spec.tenant_id {
|
||||
tenant_id
|
||||
} else {
|
||||
spec.cluster
|
||||
.settings
|
||||
.find("neon.tenant_id")
|
||||
.ok_or("tenant id should be provided")
|
||||
.map(|s| TenantId::from_str(&s))?
|
||||
.or(Err("invalid tenant id"))?
|
||||
};
|
||||
let timeline_id: TimelineId = if let Some(timeline_id) = spec.timeline_id {
|
||||
timeline_id
|
||||
} else {
|
||||
spec.cluster
|
||||
.settings
|
||||
.find("neon.timeline_id")
|
||||
.ok_or("timeline id should be provided")
|
||||
.map(|s| TimelineId::from_str(&s))?
|
||||
.or(Err("invalid timeline id"))?
|
||||
};
|
||||
|
||||
let endpoint_storage_addr: Option<SocketAddr> = spec
|
||||
.endpoint_storage_addr
|
||||
.clone()
|
||||
.or_else(|| spec.cluster.settings.find("neon.endpoint_storage_addr"))
|
||||
.unwrap_or_default()
|
||||
.parse()
|
||||
.ok();
|
||||
let endpoint_storage_token = spec
|
||||
.endpoint_storage_token
|
||||
.clone()
|
||||
.or_else(|| spec.cluster.settings.find("neon.endpoint_storage_token"));
|
||||
|
||||
Ok(ParsedSpec {
|
||||
spec,
|
||||
pageserver_connstr,
|
||||
safekeeper_connstrings,
|
||||
storage_auth_token,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
endpoint_storage_addr,
|
||||
endpoint_storage_token,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// If we are a VM, returns a [`Command`] that will run in the `neon-postgres`
|
||||
/// cgroup. Otherwise returns the default `Command::new(cmd)`
|
||||
///
|
||||
@@ -368,10 +276,7 @@ impl ComputeNode {
|
||||
tokio_conn_conf.options(&options);
|
||||
|
||||
let mut new_state = ComputeState::new();
|
||||
if let Some(spec) = config.spec {
|
||||
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
|
||||
new_state.pspec = Some(pspec);
|
||||
}
|
||||
new_state.spec = config.spec;
|
||||
|
||||
Ok(ComputeNode {
|
||||
params,
|
||||
@@ -386,10 +291,10 @@ impl ComputeNode {
|
||||
|
||||
/// Top-level control flow of compute_ctl. Returns a process exit code we should
|
||||
/// exit with.
|
||||
pub fn run(self) -> Result<Option<i32>> {
|
||||
pub async fn run(self) -> Result<Option<i32>> {
|
||||
let this = Arc::new(self);
|
||||
|
||||
let cli_spec = this.state.lock().unwrap().pspec.clone();
|
||||
let cli_spec = this.state.lock().unwrap().spec.clone();
|
||||
|
||||
// If this is a pooled VM, prewarm before starting HTTP server and becoming
|
||||
// available for binding. Prewarming helps Postgres start quicker later,
|
||||
@@ -425,7 +330,7 @@ impl ComputeNode {
|
||||
|
||||
// If we got a spec from the CLI already, use that. Otherwise wait for the
|
||||
// control plane to pass it to us with a /configure HTTP request
|
||||
let pspec = if let Some(cli_spec) = cli_spec {
|
||||
let spec = if let Some(cli_spec) = cli_spec {
|
||||
cli_spec
|
||||
} else {
|
||||
this.wait_spec()?
|
||||
@@ -438,11 +343,11 @@ impl ComputeNode {
|
||||
let mut vm_monitor = None;
|
||||
let mut pg_process: Option<PostgresHandle> = None;
|
||||
|
||||
match this.start_compute(&mut pg_process) {
|
||||
match this.start_compute(&mut pg_process).await {
|
||||
Ok(()) => {
|
||||
// Success! Launch remaining services (just vm-monitor currently)
|
||||
vm_monitor =
|
||||
Some(this.start_vm_monitor(pspec.spec.disable_lfc_resizing.unwrap_or(false)));
|
||||
Some(this.start_vm_monitor(spec.disable_lfc_resizing.unwrap_or(false)));
|
||||
}
|
||||
Err(err) => {
|
||||
// Something went wrong with the startup. Log it and expose the error to
|
||||
@@ -487,7 +392,7 @@ impl ComputeNode {
|
||||
}
|
||||
|
||||
// Reap the postgres process
|
||||
delay_exit |= this.cleanup_after_postgres_exit()?;
|
||||
delay_exit |= this.cleanup_after_postgres_exit().await?;
|
||||
|
||||
// If launch failed, keep serving HTTP requests for a while, so the cloud
|
||||
// control plane can get the actual error.
|
||||
@@ -498,7 +403,7 @@ impl ComputeNode {
|
||||
Ok(exit_code)
|
||||
}
|
||||
|
||||
pub fn wait_spec(&self) -> Result<ParsedSpec> {
|
||||
pub fn wait_spec(&self) -> Result<ComputeSpec> {
|
||||
info!("no compute spec provided, waiting");
|
||||
let mut state = self.state.lock().unwrap();
|
||||
while state.status != ComputeStatus::ConfigurationPending {
|
||||
@@ -506,7 +411,7 @@ impl ComputeNode {
|
||||
}
|
||||
|
||||
info!("got spec, continue configuration");
|
||||
let spec = state.pspec.as_ref().unwrap().clone();
|
||||
let spec = state.spec.as_ref().unwrap().clone();
|
||||
|
||||
// Record for how long we slept waiting for the spec.
|
||||
let now = Utc::now();
|
||||
@@ -539,7 +444,7 @@ impl ComputeNode {
|
||||
///
|
||||
/// Note that this is in the critical path of a compute cold start. Keep this fast.
|
||||
/// Try to do things concurrently, to hide the latencies.
|
||||
fn start_compute(self: &Arc<Self>, pg_handle: &mut Option<PostgresHandle>) -> Result<()> {
|
||||
async fn start_compute(self: &Arc<Self>, pg_handle: &mut Option<PostgresHandle>) -> Result<()> {
|
||||
let compute_state: ComputeState;
|
||||
|
||||
let start_compute_span;
|
||||
@@ -574,18 +479,17 @@ impl ComputeNode {
|
||||
compute_state = state_guard.clone()
|
||||
}
|
||||
|
||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
let spec = compute_state.spec.as_ref().expect("spec must be set");
|
||||
info!(
|
||||
"starting compute for project {}, operation {}, tenant {}, timeline {}, project {}, branch {}, endpoint {}, features {:?}, spec.remote_extensions {:?}",
|
||||
pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None"),
|
||||
pspec.spec.operation_uuid.as_deref().unwrap_or("None"),
|
||||
pspec.tenant_id,
|
||||
pspec.timeline_id,
|
||||
pspec.spec.project_id.as_deref().unwrap_or("None"),
|
||||
pspec.spec.branch_id.as_deref().unwrap_or("None"),
|
||||
pspec.spec.endpoint_id.as_deref().unwrap_or("None"),
|
||||
pspec.spec.features,
|
||||
pspec.spec.remote_extensions,
|
||||
"starting compute for operation {}, tenant {}, timeline {}, project {}, branch {}, endpoint {}, features {:?}, spec.remote_extensions {:?}",
|
||||
spec.operation_uuid.as_deref().unwrap_or("None"),
|
||||
spec.tenant_id,
|
||||
spec.timeline_id,
|
||||
spec.project_id,
|
||||
spec.branch_id,
|
||||
spec.endpoint_id,
|
||||
spec.features,
|
||||
spec.remote_extensions,
|
||||
);
|
||||
|
||||
////// PRE-STARTUP PHASE: things that need to be finished before we start the Postgres process
|
||||
@@ -606,8 +510,8 @@ impl ComputeNode {
|
||||
let tls_config = self.tls_config(&pspec.spec);
|
||||
|
||||
// If there are any remote extensions in shared_preload_libraries, start downloading them
|
||||
if pspec.spec.remote_extensions.is_some() {
|
||||
let (this, spec) = (self.clone(), pspec.spec.clone());
|
||||
if spec.remote_extensions.is_some() {
|
||||
let (this, spec) = (self.clone(), spec.clone());
|
||||
pre_tasks.spawn(async move {
|
||||
this.download_preload_extensions(&spec)
|
||||
.in_current_span()
|
||||
@@ -618,13 +522,11 @@ impl ComputeNode {
|
||||
// Prepare pgdata directory. This downloads the basebackup, among other things.
|
||||
{
|
||||
let (this, cs) = (self.clone(), compute_state.clone());
|
||||
pre_tasks.spawn_blocking_child(move || this.prepare_pgdata(&cs));
|
||||
pre_tasks.spawn(async move { this.prepare_pgdata(&cs).await });
|
||||
}
|
||||
|
||||
// Resize swap to the desired size if the compute spec says so
|
||||
if let (Some(size_bytes), true) =
|
||||
(pspec.spec.swap_size_bytes, self.params.resize_swap_on_bind)
|
||||
{
|
||||
if let (Some(size_bytes), true) = (spec.swap_size_bytes, self.params.resize_swap_on_bind) {
|
||||
pre_tasks.spawn_blocking_child(move || {
|
||||
// To avoid 'swapoff' hitting postgres startup, we need to run resize-swap to completion
|
||||
// *before* starting postgres.
|
||||
@@ -642,7 +544,7 @@ impl ComputeNode {
|
||||
|
||||
// Set disk quota if the compute spec says so
|
||||
if let (Some(disk_quota_bytes), Some(disk_quota_fs_mountpoint)) = (
|
||||
pspec.spec.disk_quota_bytes,
|
||||
spec.disk_quota_bytes,
|
||||
self.params.set_disk_quota_for_fs.as_ref(),
|
||||
) {
|
||||
let disk_quota_fs_mountpoint = disk_quota_fs_mountpoint.clone();
|
||||
@@ -657,7 +559,7 @@ impl ComputeNode {
|
||||
}
|
||||
|
||||
// tune pgbouncer
|
||||
if let Some(pgbouncer_settings) = &pspec.spec.pgbouncer_settings {
|
||||
if let Some(pgbouncer_settings) = &spec.pgbouncer_settings {
|
||||
info!("tuning pgbouncer");
|
||||
|
||||
let pgbouncer_settings = pgbouncer_settings.clone();
|
||||
@@ -675,7 +577,7 @@ impl ComputeNode {
|
||||
}
|
||||
|
||||
// configure local_proxy
|
||||
if let Some(local_proxy) = &pspec.spec.local_proxy_config {
|
||||
if let Some(local_proxy) = &spec.local_proxy_config {
|
||||
info!("configuring local_proxy");
|
||||
|
||||
// Spawn a background task to do the configuration,
|
||||
@@ -693,7 +595,7 @@ impl ComputeNode {
|
||||
}
|
||||
|
||||
// Configure and start rsyslog for compliance audit logging
|
||||
match pspec.spec.audit_log_level {
|
||||
match spec.audit_log_level {
|
||||
ComputeAudit::Hipaa | ComputeAudit::Extended | ComputeAudit::Full => {
|
||||
let remote_endpoint =
|
||||
std::env::var("AUDIT_LOGGING_ENDPOINT").unwrap_or("".to_string());
|
||||
@@ -704,16 +606,10 @@ impl ComputeNode {
|
||||
let log_directory_path = Path::new(&self.params.pgdata).join("log");
|
||||
let log_directory_path = log_directory_path.to_string_lossy().to_string();
|
||||
|
||||
// Add project_id,endpoint_id to identify the logs.
|
||||
//
|
||||
// These ids are passed from cplane,
|
||||
let endpoint_id = pspec.spec.endpoint_id.as_deref().unwrap_or("");
|
||||
let project_id = pspec.spec.project_id.as_deref().unwrap_or("");
|
||||
|
||||
configure_audit_rsyslog(
|
||||
log_directory_path.clone(),
|
||||
endpoint_id,
|
||||
project_id,
|
||||
&spec.endpoint_id,
|
||||
&spec.project_id,
|
||||
&remote_endpoint,
|
||||
)?;
|
||||
|
||||
@@ -724,7 +620,7 @@ impl ComputeNode {
|
||||
}
|
||||
|
||||
// Configure and start rsyslog for Postgres logs export
|
||||
let conf = PostgresLogsRsyslogConfig::new(pspec.spec.logs_export_host.as_deref());
|
||||
let conf = PostgresLogsRsyslogConfig::new(spec.logs_export_host.as_deref());
|
||||
configure_postgres_logs_export(conf)?;
|
||||
|
||||
// Launch remaining service threads
|
||||
@@ -732,21 +628,20 @@ impl ComputeNode {
|
||||
let _configurator_handle = launch_configurator(self);
|
||||
|
||||
// Wait for all the pre-tasks to finish before starting postgres
|
||||
let rt = tokio::runtime::Handle::current();
|
||||
while let Some(res) = rt.block_on(pre_tasks.join_next()) {
|
||||
while let Some(res) = pre_tasks.join_next().await {
|
||||
res??;
|
||||
}
|
||||
|
||||
////// START POSTGRES
|
||||
let start_time = Utc::now();
|
||||
let pg_process = self.start_postgres(pspec.storage_auth_token.clone())?;
|
||||
let pg_process = self.start_postgres(spec.storage_auth_token.clone())?;
|
||||
let postmaster_pid = pg_process.pid();
|
||||
*pg_handle = Some(pg_process);
|
||||
|
||||
// If this is a primary endpoint, perform some post-startup configuration before
|
||||
// opening it up for the world.
|
||||
let config_time = Utc::now();
|
||||
if pspec.spec.mode == ComputeMode::Primary {
|
||||
if spec.mode == ComputeMode::Primary {
|
||||
self.configure_as_primary(&compute_state)?;
|
||||
|
||||
let conf = self.get_tokio_conn_conf(None);
|
||||
@@ -787,6 +682,7 @@ impl ComputeNode {
|
||||
if pspec.spec.autoprewarm {
|
||||
self.prewarm_lfc();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -862,14 +758,14 @@ impl ComputeNode {
|
||||
}
|
||||
}
|
||||
|
||||
fn cleanup_after_postgres_exit(&self) -> Result<bool> {
|
||||
async fn cleanup_after_postgres_exit(&self) -> Result<bool> {
|
||||
// Maybe sync safekeepers again, to speed up next startup
|
||||
let compute_state = self.state.lock().unwrap().clone();
|
||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
if matches!(pspec.spec.mode, compute_api::spec::ComputeMode::Primary) {
|
||||
let spec = compute_state.spec.as_ref().expect("spec must be set");
|
||||
if matches!(spec.mode, compute_api::spec::ComputeMode::Primary) {
|
||||
info!("syncing safekeepers on shutdown");
|
||||
let storage_auth_token = pspec.storage_auth_token.clone();
|
||||
let lsn = self.sync_safekeepers(storage_auth_token)?;
|
||||
let storage_auth_token = spec.storage_auth_token.clone();
|
||||
let lsn = self.sync_safekeepers(storage_auth_token).await?;
|
||||
info!("synced safekeepers at lsn {lsn}");
|
||||
}
|
||||
|
||||
@@ -892,13 +788,12 @@ impl ComputeNode {
|
||||
|
||||
/// Check that compute node has corresponding feature enabled.
|
||||
pub fn has_feature(&self, feature: ComputeFeature) -> bool {
|
||||
let state = self.state.lock().unwrap();
|
||||
|
||||
if let Some(s) = state.pspec.as_ref() {
|
||||
s.spec.features.contains(&feature)
|
||||
} else {
|
||||
false
|
||||
}
|
||||
self.state
|
||||
.lock()
|
||||
.unwrap()
|
||||
.spec
|
||||
.as_ref()
|
||||
.is_some_and(|spec| spec.features.contains(&feature))
|
||||
}
|
||||
|
||||
pub fn set_status(&self, status: ComputeStatus) {
|
||||
@@ -915,13 +810,15 @@ impl ComputeNode {
|
||||
self.state.lock().unwrap().status
|
||||
}
|
||||
|
||||
pub fn get_timeline_id(&self) -> Option<TimelineId> {
|
||||
pub fn get_timeline_id(&self) -> TimelineId {
|
||||
self.state
|
||||
.lock()
|
||||
.unwrap()
|
||||
.pspec
|
||||
.spec
|
||||
.as_ref()
|
||||
.map(|s| s.timeline_id)
|
||||
.unwrap()
|
||||
.timeline_id
|
||||
.clone()
|
||||
}
|
||||
|
||||
// Remove `pgdata` directory and create it again with right permissions.
|
||||
@@ -940,11 +837,10 @@ impl ComputeNode {
|
||||
// unarchive it to `pgdata` directory overriding all its previous content.
|
||||
#[instrument(skip_all, fields(%lsn))]
|
||||
fn try_get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
|
||||
let spec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
let spec = compute_state.spec.as_ref().expect("spec must be set");
|
||||
let start_time = Instant::now();
|
||||
|
||||
let shard0_connstr = spec.pageserver_connstr.split(',').next().unwrap();
|
||||
let mut config = postgres::Config::from_str(shard0_connstr)?;
|
||||
let mut config = postgres::Config::from(&spec.pageservers[0]);
|
||||
|
||||
// Use the storage auth token from the config file, if given.
|
||||
// Note: this overrides any password set in the connection string.
|
||||
@@ -956,20 +852,17 @@ impl ComputeNode {
|
||||
}
|
||||
|
||||
config.application_name("compute_ctl");
|
||||
if let Some(spec) = &compute_state.pspec {
|
||||
config.options(&format!(
|
||||
"-c neon.compute_mode={}",
|
||||
spec.spec.mode.to_type_str()
|
||||
));
|
||||
if let Some(spec) = &compute_state.spec {
|
||||
config.options(&format!("-c neon.compute_mode={}", spec.mode.to_type_str()));
|
||||
}
|
||||
|
||||
// Connect to pageserver
|
||||
let mut client = config.connect(NoTls)?;
|
||||
let mut client = config.connect(postgres::NoTls)?;
|
||||
let pageserver_connect_micros = start_time.elapsed().as_micros() as u64;
|
||||
|
||||
let basebackup_cmd = match lsn {
|
||||
Lsn(0) => {
|
||||
if spec.spec.mode != ComputeMode::Primary {
|
||||
if spec.mode != ComputeMode::Primary {
|
||||
format!(
|
||||
"basebackup {} {} --gzip --replica",
|
||||
spec.tenant_id, spec.timeline_id
|
||||
@@ -979,7 +872,7 @@ impl ComputeNode {
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
if spec.spec.mode != ComputeMode::Primary {
|
||||
if spec.mode != ComputeMode::Primary {
|
||||
format!(
|
||||
"basebackup {} {} {} --gzip --replica",
|
||||
spec.tenant_id, spec.timeline_id, lsn
|
||||
@@ -1055,35 +948,34 @@ impl ComputeNode {
|
||||
compute_state: &ComputeState,
|
||||
) -> Result<Option<Lsn>> {
|
||||
// Construct a connection config for each safekeeper
|
||||
let pspec: ParsedSpec = compute_state
|
||||
.pspec
|
||||
let spec = compute_state
|
||||
.spec
|
||||
.as_ref()
|
||||
.expect("spec must be set")
|
||||
.clone();
|
||||
let sk_connstrs: Vec<String> = pspec.safekeeper_connstrings.clone();
|
||||
let sk_configs = sk_connstrs.into_iter().map(|connstr| {
|
||||
// Format connstr
|
||||
let id = connstr.clone();
|
||||
let connstr = format!("postgresql://no_user@{}", connstr);
|
||||
let options = format!(
|
||||
"-c timeline_id={} tenant_id={}",
|
||||
pspec.timeline_id, pspec.tenant_id
|
||||
);
|
||||
let safekeepers = spec
|
||||
.safekeepers
|
||||
.iter()
|
||||
.map(|s| {
|
||||
let mut config = tokio_postgres::Config::from(s);
|
||||
|
||||
// Construct client
|
||||
let mut config = tokio_postgres::Config::from_str(&connstr).unwrap();
|
||||
config.options(&options);
|
||||
if let Some(storage_auth_token) = pspec.storage_auth_token.clone() {
|
||||
config.password(storage_auth_token);
|
||||
}
|
||||
config.user("no_user");
|
||||
config.options(&format!(
|
||||
"-c timeline_id={} tenant_id={}",
|
||||
spec.timeline_id, spec.tenant_id
|
||||
));
|
||||
if let Some(storage_auth_token) = &spec.storage_auth_token {
|
||||
config.password(storage_auth_token);
|
||||
}
|
||||
|
||||
(id, config)
|
||||
});
|
||||
(format!("{}:{}", s.host, s.port), config)
|
||||
})
|
||||
.collect::<Vec<(String, tokio_postgres::Config)>>();
|
||||
|
||||
// Create task set to query all safekeepers
|
||||
let mut tasks = FuturesUnordered::new();
|
||||
let quorum = sk_configs.len() / 2 + 1;
|
||||
for (id, config) in sk_configs {
|
||||
let quorum = safekeepers.len() / 2 + 1;
|
||||
for (id, config) in safekeepers {
|
||||
let timeout = tokio::time::Duration::from_millis(100);
|
||||
let task = tokio::time::timeout(timeout, ping_safekeeper(id, config));
|
||||
tasks.push(tokio::spawn(task));
|
||||
@@ -1128,11 +1020,13 @@ impl ComputeNode {
|
||||
// Fast path for sync_safekeepers. If they're already synced we get the lsn
|
||||
// in one roundtrip. If not, we should do a full sync_safekeepers.
|
||||
#[instrument(skip_all)]
|
||||
pub fn check_safekeepers_synced(&self, compute_state: &ComputeState) -> Result<Option<Lsn>> {
|
||||
pub async fn check_safekeepers_synced(
|
||||
&self,
|
||||
compute_state: &ComputeState,
|
||||
) -> Result<Option<Lsn>> {
|
||||
let start_time = Utc::now();
|
||||
|
||||
let rt = tokio::runtime::Handle::current();
|
||||
let result = rt.block_on(self.check_safekeepers_synced_async(compute_state));
|
||||
let result = self.check_safekeepers_synced_async(compute_state).await;
|
||||
|
||||
// Record runtime
|
||||
self.state.lock().unwrap().metrics.sync_sk_check_ms = Utc::now()
|
||||
@@ -1146,7 +1040,7 @@ impl ComputeNode {
|
||||
// Run `postgres` in a special mode with `--sync-safekeepers` argument
|
||||
// and return the reported LSN back to the caller.
|
||||
#[instrument(skip_all)]
|
||||
pub fn sync_safekeepers(&self, storage_auth_token: Option<String>) -> Result<Lsn> {
|
||||
pub async fn sync_safekeepers(&self, storage_auth_token: Option<String>) -> Result<Lsn> {
|
||||
let start_time = Utc::now();
|
||||
|
||||
let mut sync_handle = maybe_cgexec(&self.params.pgbin)
|
||||
@@ -1178,8 +1072,8 @@ impl ComputeNode {
|
||||
SYNC_SAFEKEEPERS_PID.store(0, Ordering::SeqCst);
|
||||
|
||||
// Process has exited, so we can join the logs thread.
|
||||
let _ = tokio::runtime::Handle::current()
|
||||
.block_on(logs_handle)
|
||||
let _ = logs_handle
|
||||
.await
|
||||
.map_err(|e| tracing::error!("log task panicked: {:?}", e));
|
||||
|
||||
if !sync_output.status.success() {
|
||||
@@ -1205,9 +1099,8 @@ impl ComputeNode {
|
||||
/// Do all the preparations like PGDATA directory creation, configuration,
|
||||
/// safekeepers sync, basebackup, etc.
|
||||
#[instrument(skip_all)]
|
||||
pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> {
|
||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
let spec = &pspec.spec;
|
||||
pub async fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> {
|
||||
let spec = compute_state.spec.as_ref().expect("spec must be set");
|
||||
let pgdata_path = Path::new(&self.params.pgdata);
|
||||
|
||||
let tls_config = self.tls_config(&pspec.spec);
|
||||
@@ -1216,7 +1109,7 @@ impl ComputeNode {
|
||||
self.create_pgdata()?;
|
||||
config::write_postgres_conf(
|
||||
pgdata_path,
|
||||
&pspec.spec,
|
||||
spec,
|
||||
self.params.internal_http_port,
|
||||
tls_config,
|
||||
)?;
|
||||
@@ -1227,11 +1120,13 @@ impl ComputeNode {
|
||||
let lsn = match spec.mode {
|
||||
ComputeMode::Primary => {
|
||||
info!("checking if safekeepers are synced");
|
||||
let lsn = if let Ok(Some(lsn)) = self.check_safekeepers_synced(compute_state) {
|
||||
let lsn = if let Ok(Some(lsn)) = self.check_safekeepers_synced(compute_state).await
|
||||
{
|
||||
lsn
|
||||
} else {
|
||||
info!("starting safekeepers syncing");
|
||||
self.sync_safekeepers(pspec.storage_auth_token.clone())
|
||||
self.sync_safekeepers(spec.storage_auth_token.clone())
|
||||
.await
|
||||
.with_context(|| "failed to sync safekeepers")?
|
||||
};
|
||||
info!("safekeepers synced at LSN {}", lsn);
|
||||
@@ -1248,13 +1143,13 @@ impl ComputeNode {
|
||||
};
|
||||
|
||||
info!(
|
||||
"getting basebackup@{} from pageserver {}",
|
||||
lsn, &pspec.pageserver_connstr
|
||||
"getting basebackup@{} from pageserver {}:{}",
|
||||
lsn, spec.pageservers[0].host, spec.pageservers[0].port
|
||||
);
|
||||
self.get_basebackup(compute_state, lsn).with_context(|| {
|
||||
format!(
|
||||
"failed to get basebackup@{} from pageserver {}",
|
||||
lsn, &pspec.pageserver_connstr
|
||||
"failed to get basebackup@{} from pageserver {}:{}",
|
||||
lsn, spec.pageservers[0].host, spec.pageservers[0].port
|
||||
)
|
||||
})?;
|
||||
|
||||
@@ -1536,10 +1431,9 @@ impl ComputeNode {
|
||||
let conf = Arc::new(conf);
|
||||
let spec = Arc::new(
|
||||
compute_state
|
||||
.pspec
|
||||
.spec
|
||||
.as_ref()
|
||||
.expect("spec must be set")
|
||||
.spec
|
||||
.clone(),
|
||||
);
|
||||
|
||||
@@ -1608,7 +1502,7 @@ impl ComputeNode {
|
||||
/// as it's used to reconfigure a previously started and configured Postgres node.
|
||||
#[instrument(skip_all)]
|
||||
pub fn reconfigure(&self) -> Result<()> {
|
||||
let spec = self.state.lock().unwrap().pspec.clone().unwrap().spec;
|
||||
let spec = self.state.lock().unwrap().spec.as_ref().unwrap().clone();
|
||||
|
||||
let tls_config = self.tls_config(&spec);
|
||||
|
||||
@@ -1690,10 +1584,10 @@ impl ComputeNode {
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub fn configure_as_primary(&self, compute_state: &ComputeState) -> Result<()> {
|
||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
let spec = compute_state.spec.as_ref().expect("spec must be set");
|
||||
|
||||
assert!(pspec.spec.mode == ComputeMode::Primary);
|
||||
if !pspec.spec.skip_pg_catalog_updates {
|
||||
assert!(spec.mode == ComputeMode::Primary);
|
||||
if !spec.skip_pg_catalog_updates {
|
||||
let pgdata_path = Path::new(&self.params.pgdata);
|
||||
// temporarily reset max_cluster_size in config
|
||||
// to avoid the possibility of hitting the limit, while we are applying config:
|
||||
@@ -2189,24 +2083,23 @@ LIMIT 100",
|
||||
/// the pageserver connection strings has changed.
|
||||
///
|
||||
/// The operation will time out after a specified duration.
|
||||
pub fn wait_timeout_while_pageserver_connstr_unchanged(&self, duration: Duration) {
|
||||
pub fn wait_timeout_while_pageservers_unchanged(&self, duration: Duration) {
|
||||
let state = self.state.lock().unwrap();
|
||||
let old_pageserver_connstr = state
|
||||
.pspec
|
||||
let old_pageservers = state
|
||||
.spec
|
||||
.as_ref()
|
||||
.expect("spec must be set")
|
||||
.pageserver_connstr
|
||||
.pageservers
|
||||
.clone();
|
||||
let mut unchanged = true;
|
||||
let _ = self
|
||||
.state_changed
|
||||
.wait_timeout_while(state, duration, |s| {
|
||||
let pageserver_connstr = &s
|
||||
.pspec
|
||||
.as_ref()
|
||||
.expect("spec must be set")
|
||||
.pageserver_connstr;
|
||||
unchanged = pageserver_connstr == &old_pageserver_connstr;
|
||||
let current_pageservers = &s.spec.as_ref().expect("spec must be set").pageservers;
|
||||
unchanged = current_pageservers
|
||||
.iter()
|
||||
.zip(&old_pageservers)
|
||||
.all(|(c, o)| c == o);
|
||||
unchanged
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
@@ -3,6 +3,7 @@ use anyhow::{Context, Result, bail};
|
||||
use async_compression::tokio::bufread::{ZstdDecoder, ZstdEncoder};
|
||||
use compute_api::responses::LfcOffloadState;
|
||||
use compute_api::responses::LfcPrewarmState;
|
||||
use compute_api::spec::ComputeSpec;
|
||||
use http::StatusCode;
|
||||
use reqwest::Client;
|
||||
use std::sync::Arc;
|
||||
@@ -25,24 +26,30 @@ struct EndpointStoragePair {
|
||||
}
|
||||
|
||||
const KEY: &str = "lfc_state";
|
||||
impl TryFrom<&crate::compute::ParsedSpec> for EndpointStoragePair {
|
||||
impl TryFrom<&ComputeSpec> for EndpointStoragePair {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(pspec: &crate::compute::ParsedSpec) -> Result<Self, Self::Error> {
|
||||
let Some(ref endpoint_id) = pspec.spec.endpoint_id else {
|
||||
bail!("pspec.endpoint_id missing")
|
||||
fn try_from(spec: &ComputeSpec) -> Result<Self, Self::Error> {
|
||||
let Some(ref addr) = spec.endpoint_storage_addr else {
|
||||
bail!("spec.endpoint_storage_addr missing")
|
||||
};
|
||||
let Some(ref base_uri) = pspec.endpoint_storage_addr else {
|
||||
bail!("pspec.endpoint_storage_addr missing")
|
||||
};
|
||||
let tenant_id = pspec.tenant_id;
|
||||
let timeline_id = pspec.timeline_id;
|
||||
|
||||
let url = format!("http://{base_uri}/{tenant_id}/{timeline_id}/{endpoint_id}/{KEY}");
|
||||
let Some(ref token) = pspec.endpoint_storage_token else {
|
||||
bail!("pspec.endpoint_storage_token missing")
|
||||
let url = format!(
|
||||
"http://{addr}/{tenant_id}/{timeline_id}/{endpoint_id}/{key}",
|
||||
addr = addr,
|
||||
tenant_id = spec.tenant_id,
|
||||
timeline_id = spec.timeline_id,
|
||||
endpoint_id = spec.endpoint_id,
|
||||
key = KEY
|
||||
);
|
||||
|
||||
let Some(ref token) = spec.endpoint_storage_token else {
|
||||
bail!("spec.endpoint_storage_token missing")
|
||||
};
|
||||
let token = token.clone();
|
||||
Ok(EndpointStoragePair { url, token })
|
||||
|
||||
Ok(EndpointStoragePair {
|
||||
url,
|
||||
token: token.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -111,7 +118,7 @@ impl ComputeNode {
|
||||
|
||||
fn endpoint_storage_pair(&self) -> Result<EndpointStoragePair> {
|
||||
let state = self.state.lock().unwrap();
|
||||
state.pspec.as_ref().unwrap().try_into()
|
||||
state.spec.as_ref().unwrap().try_into()
|
||||
}
|
||||
|
||||
async fn prewarm_impl(&self) -> Result<()> {
|
||||
|
||||
@@ -56,13 +56,24 @@ pub fn write_postgres_conf(
|
||||
|
||||
// Add options for connecting to storage
|
||||
writeln!(file, "# Neon storage settings")?;
|
||||
if let Some(s) = &spec.pageserver_connstring {
|
||||
writeln!(file, "neon.pageserver_connstring={}", escape_conf_value(s))?;
|
||||
if !spec.pageservers.is_empty() {
|
||||
writeln!(
|
||||
file,
|
||||
"neon.pageserver_connstring={}",
|
||||
escape_conf_value(
|
||||
&spec
|
||||
.pageservers
|
||||
.iter()
|
||||
.map(|p| format!("host={} port={}", p.host, p.port))
|
||||
.collect::<Vec<_>>()
|
||||
.join(",")
|
||||
)
|
||||
)?;
|
||||
}
|
||||
if let Some(stripe_size) = spec.shard_stripe_size {
|
||||
writeln!(file, "neon.stripe_size={stripe_size}")?;
|
||||
}
|
||||
if !spec.safekeeper_connstrings.is_empty() {
|
||||
if !spec.safekeepers.is_empty() {
|
||||
let mut neon_safekeepers_value = String::new();
|
||||
tracing::info!(
|
||||
"safekeepers_connstrings is not zero, gen: {:?}",
|
||||
@@ -72,32 +83,45 @@ pub fn write_postgres_conf(
|
||||
if let Some(generation) = spec.safekeepers_generation {
|
||||
write!(neon_safekeepers_value, "g#{}:", generation)?;
|
||||
}
|
||||
neon_safekeepers_value.push_str(&spec.safekeeper_connstrings.join(","));
|
||||
neon_safekeepers_value.push_str(
|
||||
&spec
|
||||
.safekeepers
|
||||
.iter()
|
||||
.map(|s| format!("{}:{}", s.host.to_string(), s.port))
|
||||
.collect::<Vec<_>>()
|
||||
.join(","),
|
||||
);
|
||||
writeln!(
|
||||
file,
|
||||
"neon.safekeepers={}",
|
||||
escape_conf_value(&neon_safekeepers_value)
|
||||
)?;
|
||||
}
|
||||
if let Some(s) = &spec.tenant_id {
|
||||
writeln!(file, "neon.tenant_id={}", escape_conf_value(&s.to_string()))?;
|
||||
}
|
||||
if let Some(s) = &spec.timeline_id {
|
||||
writeln!(
|
||||
file,
|
||||
"neon.timeline_id={}",
|
||||
escape_conf_value(&s.to_string())
|
||||
)?;
|
||||
}
|
||||
if let Some(s) = &spec.project_id {
|
||||
writeln!(file, "neon.project_id={}", escape_conf_value(s))?;
|
||||
}
|
||||
if let Some(s) = &spec.branch_id {
|
||||
writeln!(file, "neon.branch_id={}", escape_conf_value(s))?;
|
||||
}
|
||||
if let Some(s) = &spec.endpoint_id {
|
||||
writeln!(file, "neon.endpoint_id={}", escape_conf_value(s))?;
|
||||
}
|
||||
writeln!(
|
||||
file,
|
||||
"neon.tenant_id={}",
|
||||
escape_conf_value(&spec.tenant_id.to_string())
|
||||
)?;
|
||||
writeln!(
|
||||
file,
|
||||
"neon.timeline_id={}",
|
||||
escape_conf_value(&spec.timeline_id.to_string())
|
||||
)?;
|
||||
writeln!(
|
||||
file,
|
||||
"neon.project_id={}",
|
||||
escape_conf_value(&spec.project_id)
|
||||
)?;
|
||||
writeln!(
|
||||
file,
|
||||
"neon.branch_id={}",
|
||||
escape_conf_value(&spec.branch_id)
|
||||
)?;
|
||||
writeln!(
|
||||
file,
|
||||
"neon.endpoint_id={}",
|
||||
escape_conf_value(&spec.endpoint_id)
|
||||
)?;
|
||||
|
||||
// tls
|
||||
if let Some(tls_config) = tls_config {
|
||||
|
||||
@@ -8,7 +8,7 @@ use http::StatusCode;
|
||||
use tokio::task;
|
||||
use tracing::info;
|
||||
|
||||
use crate::compute::{ComputeNode, ParsedSpec};
|
||||
use crate::compute::ComputeNode;
|
||||
use crate::http::JsonResponse;
|
||||
use crate::http::extract::Json;
|
||||
|
||||
@@ -22,11 +22,6 @@ pub(in crate::http) async fn configure(
|
||||
State(compute): State<Arc<ComputeNode>>,
|
||||
request: Json<ConfigurationRequest>,
|
||||
) -> Response {
|
||||
let pspec = match ParsedSpec::try_from(request.0.spec) {
|
||||
Ok(p) => p,
|
||||
Err(e) => return JsonResponse::error(StatusCode::BAD_REQUEST, e),
|
||||
};
|
||||
|
||||
// XXX: wrap state update under lock in a code block. Otherwise, we will try
|
||||
// to `Send` `mut state` into the spawned thread bellow, which will cause
|
||||
// the following rustc error:
|
||||
@@ -43,7 +38,7 @@ pub(in crate::http) async fn configure(
|
||||
// configure request for tracing purposes.
|
||||
state.startup_span = Some(tracing::Span::current());
|
||||
|
||||
state.pspec = Some(pspec);
|
||||
state.spec = Some(request.spec.clone());
|
||||
state.set_status(ComputeStatus::ConfigurationPending, &compute.state_changed);
|
||||
drop(state);
|
||||
}
|
||||
|
||||
@@ -31,8 +31,7 @@ pub(in crate::http) async fn download_extension(
|
||||
|
||||
let ext = {
|
||||
let state = compute.state.lock().unwrap();
|
||||
let pspec = state.pspec.as_ref().unwrap();
|
||||
let spec = &pspec.spec;
|
||||
let spec = &state.spec.as_ref().unwrap();
|
||||
|
||||
let remote_extensions = match spec.remote_extensions.as_ref() {
|
||||
Some(r) => r,
|
||||
|
||||
@@ -21,14 +21,8 @@ impl From<&ComputeState> for ComputeStatusResponse {
|
||||
fn from(state: &ComputeState) -> Self {
|
||||
ComputeStatusResponse {
|
||||
start_time: state.start_time,
|
||||
tenant: state
|
||||
.pspec
|
||||
.as_ref()
|
||||
.map(|pspec| pspec.tenant_id.to_string()),
|
||||
timeline: state
|
||||
.pspec
|
||||
.as_ref()
|
||||
.map(|pspec| pspec.timeline_id.to_string()),
|
||||
tenant: state.spec.as_ref().map(|spec| spec.tenant_id.to_string()),
|
||||
timeline: state.spec.as_ref().map(|spec| spec.timeline_id.to_string()),
|
||||
status: state.status,
|
||||
last_active: state.last_active,
|
||||
error: state.error.clone(),
|
||||
|
||||
@@ -18,8 +18,8 @@ use crate::compute::ComputeNode;
|
||||
pub fn launch_lsn_lease_bg_task_for_static(compute: &Arc<ComputeNode>) {
|
||||
let (tenant_id, timeline_id, lsn) = {
|
||||
let state = compute.state.lock().unwrap();
|
||||
let spec = state.pspec.as_ref().expect("Spec must be set");
|
||||
match spec.spec.mode {
|
||||
let spec = state.spec.as_ref().expect("Spec must be set");
|
||||
match spec.mode {
|
||||
ComputeMode::Static(lsn) => (spec.tenant_id, spec.timeline_id, lsn),
|
||||
_ => return,
|
||||
}
|
||||
@@ -58,7 +58,7 @@ fn lsn_lease_bg_task(
|
||||
"Request succeeded, sleeping for {} seconds",
|
||||
sleep_duration.as_secs()
|
||||
);
|
||||
compute.wait_timeout_while_pageserver_connstr_unchanged(sleep_duration);
|
||||
compute.wait_timeout_while_pageservers_unchanged(sleep_duration);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -79,18 +79,11 @@ fn acquire_lsn_lease_with_retry(
|
||||
let configs = {
|
||||
let state = compute.state.lock().unwrap();
|
||||
|
||||
let spec = state.pspec.as_ref().expect("spec must be set");
|
||||
let spec = state.spec.as_ref().expect("spec must be set");
|
||||
|
||||
let conn_strings = spec.pageserver_connstr.split(',');
|
||||
|
||||
conn_strings
|
||||
.map(|connstr| {
|
||||
let mut config = postgres::Config::from_str(connstr).expect("Invalid connstr");
|
||||
if let Some(storage_auth_token) = &spec.storage_auth_token {
|
||||
config.password(storage_auth_token.clone());
|
||||
}
|
||||
config
|
||||
})
|
||||
spec.pageservers
|
||||
.iter()
|
||||
.map(|p| postgres::Config::from(p))
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
@@ -105,7 +98,7 @@ fn acquire_lsn_lease_with_retry(
|
||||
Err(e) => {
|
||||
warn!("Failed to acquire lsn lease: {e} (attempt {attempts})");
|
||||
|
||||
compute.wait_timeout_while_pageserver_connstr_unchanged(Duration::from_millis(
|
||||
compute.wait_timeout_while_pageservers_unchanged(Duration::from_millis(
|
||||
retry_period_ms as u64,
|
||||
));
|
||||
retry_period_ms *= 1.5;
|
||||
|
||||
@@ -4,7 +4,7 @@ use std::future::Future;
|
||||
use std::iter::{empty, once};
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use anyhow::Result;
|
||||
use compute_api::responses::ComputeStatus;
|
||||
use compute_api::spec::{ComputeAudit, ComputeSpec, Database, PgIdent, Role};
|
||||
use futures::future::join_all;
|
||||
@@ -74,7 +74,7 @@ impl ComputeNode {
|
||||
let mut drop_subscriptions_done = false;
|
||||
|
||||
if spec.drop_subscriptions_before_start {
|
||||
let timeline_id = self.get_timeline_id().context("timeline_id must be set")?;
|
||||
let timeline_id = self.get_timeline_id();
|
||||
|
||||
info!("Checking if drop subscription operation was already performed for timeline_id: {}", timeline_id);
|
||||
|
||||
|
||||
@@ -37,7 +37,7 @@ pub async fn ping_safekeeper(
|
||||
|
||||
// Parse result
|
||||
info!("done with {}", id);
|
||||
if let postgres::SimpleQueryMessage::Row(row) = &result[0] {
|
||||
if let tokio_postgres::SimpleQueryMessage::Row(row) = &result[0] {
|
||||
use std::str::FromStr;
|
||||
let response = TimelineStatusResponse::Ok(TimelineStatusOkResponse {
|
||||
flush_lsn: Lsn::from_str(row.get("flush_lsn").unwrap())?,
|
||||
|
||||
@@ -1493,7 +1493,10 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
|
||||
let conf = env.get_pageserver_conf(pageserver_id).unwrap();
|
||||
let parsed = parse_host_port(&conf.listen_pg_addr).expect("Bad config");
|
||||
(
|
||||
vec![(parsed.0, parsed.1.unwrap_or(5432))],
|
||||
vec![compute_api::spec::Pageserver {
|
||||
host: parsed.0,
|
||||
port: parsed.1.unwrap_or(5432),
|
||||
}],
|
||||
// If caller is telling us what pageserver to use, this is not a tenant which is
|
||||
// full managed by storage controller, therefore not sharded.
|
||||
DEFAULT_STRIPE_SIZE,
|
||||
@@ -1516,11 +1519,11 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
|
||||
.await?;
|
||||
}
|
||||
|
||||
anyhow::Ok((
|
||||
Host::parse(&shard.listen_pg_addr)
|
||||
anyhow::Ok(compute_api::spec::Pageserver {
|
||||
host: Host::parse(&shard.listen_pg_addr)
|
||||
.expect("Storage controller reported bad hostname"),
|
||||
shard.listen_pg_port,
|
||||
))
|
||||
port: shard.listen_pg_port,
|
||||
})
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
@@ -1576,10 +1579,10 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
|
||||
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
|
||||
let pageservers = if let Some(ps_id) = args.endpoint_pageserver_id {
|
||||
let pageserver = PageServerNode::from_env(env, env.get_pageserver_conf(ps_id)?);
|
||||
vec![(
|
||||
pageserver.pg_connection_config.host().clone(),
|
||||
pageserver.pg_connection_config.port(),
|
||||
)]
|
||||
vec![compute_api::spec::Pageserver {
|
||||
host: pageserver.pg_connection_config.host().clone(),
|
||||
port: pageserver.pg_connection_config.port(),
|
||||
}]
|
||||
} else {
|
||||
let storage_controller = StorageController::from_env(env);
|
||||
storage_controller
|
||||
@@ -1587,12 +1590,10 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
|
||||
.await?
|
||||
.shards
|
||||
.into_iter()
|
||||
.map(|shard| {
|
||||
(
|
||||
Host::parse(&shard.listen_pg_addr)
|
||||
.expect("Storage controller reported malformed host"),
|
||||
shard.listen_pg_port,
|
||||
)
|
||||
.map(|shard| compute_api::spec::Pageserver {
|
||||
host: Host::parse(&shard.listen_pg_addr)
|
||||
.expect("Storage controller reported malformed host"),
|
||||
port: shard.listen_pg_port,
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
@@ -52,8 +52,8 @@ use compute_api::responses::{
|
||||
ComputeConfig, ComputeCtlConfig, ComputeStatus, ComputeStatusResponse, TlsConfig,
|
||||
};
|
||||
use compute_api::spec::{
|
||||
Cluster, ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, Database, PgIdent,
|
||||
RemoteExtSpec, Role,
|
||||
Cluster, ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, Database, Pageserver, PgIdent,
|
||||
RemoteExtSpec, Role, Safekeeper,
|
||||
};
|
||||
use jsonwebtoken::jwk::{
|
||||
AlgorithmParameters, CommonParameters, EllipticCurve, Jwk, JwkSet, KeyAlgorithm, KeyOperations,
|
||||
@@ -606,29 +606,25 @@ impl Endpoint {
|
||||
}
|
||||
}
|
||||
|
||||
fn build_pageserver_connstr(pageservers: &[(Host, u16)]) -> String {
|
||||
pageservers
|
||||
.iter()
|
||||
.map(|(host, port)| format!("postgresql://no_user@{host}:{port}"))
|
||||
.collect::<Vec<_>>()
|
||||
.join(",")
|
||||
}
|
||||
fn safekeepers_from_nodes(&self, ids: Vec<NodeId>) -> Result<Vec<Safekeeper>> {
|
||||
let mut s = Vec::new();
|
||||
|
||||
/// Map safekeepers ids to the actual connection strings.
|
||||
fn build_safekeepers_connstrs(&self, sk_ids: Vec<NodeId>) -> Result<Vec<String>> {
|
||||
let mut safekeeper_connstrings = Vec::new();
|
||||
if self.mode == ComputeMode::Primary {
|
||||
for sk_id in sk_ids {
|
||||
for id in ids {
|
||||
let sk = self
|
||||
.env
|
||||
.safekeepers
|
||||
.iter()
|
||||
.find(|node| node.id == sk_id)
|
||||
.ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?;
|
||||
safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.get_compute_port()));
|
||||
.find(|node| node.id == id)
|
||||
.ok_or_else(|| anyhow!("safekeeper {id} does not exist"))?;
|
||||
s.push(Safekeeper {
|
||||
host: Host::parse("127.0.0.1")?,
|
||||
port: sk.get_compute_port(),
|
||||
});
|
||||
}
|
||||
}
|
||||
Ok(safekeeper_connstrings)
|
||||
|
||||
Ok(s)
|
||||
}
|
||||
|
||||
/// Generate a JWT with the correct claims.
|
||||
@@ -654,7 +650,7 @@ impl Endpoint {
|
||||
endpoint_storage_addr: String,
|
||||
safekeepers_generation: Option<SafekeeperGeneration>,
|
||||
safekeepers: Vec<NodeId>,
|
||||
pageservers: Vec<(Host, u16)>,
|
||||
pageservers: Vec<Pageserver>,
|
||||
remote_ext_base_url: Option<&String>,
|
||||
shard_stripe_size: usize,
|
||||
create_test_user: bool,
|
||||
@@ -672,11 +668,6 @@ impl Endpoint {
|
||||
std::fs::remove_dir_all(self.pgdata())?;
|
||||
}
|
||||
|
||||
let pageserver_connstring = Self::build_pageserver_connstr(&pageservers);
|
||||
assert!(!pageserver_connstring.is_empty());
|
||||
|
||||
let safekeeper_connstrings = self.build_safekeepers_connstrs(safekeepers)?;
|
||||
|
||||
// check for file remote_extensions_spec.json
|
||||
// if it is present, read it and pass to compute_ctl
|
||||
let remote_extensions_spec_path = self.endpoint_path().join("remote_extensions_spec.json");
|
||||
@@ -727,15 +718,34 @@ impl Endpoint {
|
||||
postgresql_conf: Some(postgresql_conf.clone()),
|
||||
},
|
||||
delta_operations: None,
|
||||
tenant_id: Some(self.tenant_id),
|
||||
timeline_id: Some(self.timeline_id),
|
||||
project_id: None,
|
||||
branch_id: None,
|
||||
endpoint_id: Some(self.endpoint_id.clone()),
|
||||
tenant_id: self.tenant_id.clone(),
|
||||
timeline_id: self.timeline_id.clone(),
|
||||
project_id: self.tenant_id.to_string(),
|
||||
branch_id: self.timeline_id.to_string(),
|
||||
endpoint_id: self.endpoint_id.clone(),
|
||||
mode: self.mode,
|
||||
pageserver_connstring: Some(pageserver_connstring),
|
||||
pageservers,
|
||||
safekeepers: {
|
||||
let mut s = Vec::new();
|
||||
|
||||
if self.mode == ComputeMode::Primary {
|
||||
for id in safekeepers {
|
||||
let sk = self
|
||||
.env
|
||||
.safekeepers
|
||||
.iter()
|
||||
.find(|node| node.id == id)
|
||||
.ok_or_else(|| anyhow!("safekeeper {id} does not exist"))?;
|
||||
s.push(Safekeeper {
|
||||
host: Host::parse("127.0.0.1")?,
|
||||
port: sk.get_compute_port(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
s
|
||||
},
|
||||
safekeepers_generation: safekeepers_generation.map(|g| g.into_inner()),
|
||||
safekeeper_connstrings,
|
||||
storage_auth_token: auth_token.clone(),
|
||||
remote_extensions,
|
||||
pgbouncer_settings: None,
|
||||
@@ -939,7 +949,7 @@ impl Endpoint {
|
||||
|
||||
pub async fn reconfigure(
|
||||
&self,
|
||||
mut pageservers: Vec<(Host, u16)>,
|
||||
pageservers: Vec<Pageserver>,
|
||||
stripe_size: Option<ShardStripeSize>,
|
||||
safekeepers: Option<Vec<NodeId>>,
|
||||
) -> Result<()> {
|
||||
@@ -958,30 +968,24 @@ impl Endpoint {
|
||||
if pageservers.is_empty() {
|
||||
let storage_controller = StorageController::from_env(&self.env);
|
||||
let locate_result = storage_controller.tenant_locate(self.tenant_id).await?;
|
||||
pageservers = locate_result
|
||||
spec.pageservers = locate_result
|
||||
.shards
|
||||
.into_iter()
|
||||
.map(|shard| {
|
||||
(
|
||||
Host::parse(&shard.listen_pg_addr)
|
||||
.expect("Storage controller reported bad hostname"),
|
||||
shard.listen_pg_port,
|
||||
)
|
||||
.map(|shard| Pageserver {
|
||||
host: Host::parse(&shard.listen_pg_addr)
|
||||
.expect("Storage controller reported bad hostname"),
|
||||
port: shard.listen_pg_port,
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
}
|
||||
|
||||
let pageserver_connstr = Self::build_pageserver_connstr(&pageservers);
|
||||
assert!(!pageserver_connstr.is_empty());
|
||||
spec.pageserver_connstring = Some(pageserver_connstr);
|
||||
if stripe_size.is_some() {
|
||||
spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize);
|
||||
}
|
||||
|
||||
// If safekeepers are not specified, don't change them.
|
||||
if let Some(safekeepers) = safekeepers {
|
||||
let safekeeper_connstrings = self.build_safekeepers_connstrs(safekeepers)?;
|
||||
spec.safekeeper_connstrings = safekeeper_connstrings;
|
||||
spec.safekeepers = self.safekeepers_from_nodes(safekeepers)?;
|
||||
}
|
||||
|
||||
let client = reqwest::Client::builder()
|
||||
|
||||
@@ -9,8 +9,11 @@ anyhow.workspace = true
|
||||
chrono.workspace = true
|
||||
indexmap.workspace = true
|
||||
jsonwebtoken.workspace = true
|
||||
postgres.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
tokio-postgres.workspace = true
|
||||
url.workspace = true
|
||||
regex.workspace = true
|
||||
|
||||
utils = { path = "../utils" }
|
||||
|
||||
@@ -9,7 +9,8 @@ use indexmap::IndexMap;
|
||||
use regex::Regex;
|
||||
use remote_storage::RemotePath;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use url::Host;
|
||||
use utils::id::{BranchId, EndpointId, ProjectId, TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::responses::TlsConfig;
|
||||
@@ -21,13 +22,77 @@ pub type PgIdent = String;
|
||||
/// String type alias representing Postgres extension version
|
||||
pub type ExtVersion = String;
|
||||
|
||||
/// Pageserver settings.
|
||||
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
|
||||
pub struct Pageserver {
|
||||
/// Hostname of the pageserver.
|
||||
pub host: Host,
|
||||
|
||||
/// Port that the safekeeper listens on.
|
||||
pub port: u16,
|
||||
}
|
||||
|
||||
impl From<&Pageserver> for postgres::Config {
|
||||
fn from(ps: &Pageserver) -> Self {
|
||||
let mut config = postgres::Config::new();
|
||||
|
||||
config.host(&ps.host.to_string());
|
||||
config.port(ps.port);
|
||||
|
||||
config
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&Pageserver> for tokio_postgres::Config {
|
||||
fn from(ps: &Pageserver) -> Self {
|
||||
let mut config = tokio_postgres::Config::new();
|
||||
|
||||
config.host(&ps.host.to_string());
|
||||
config.port(ps.port);
|
||||
|
||||
config
|
||||
}
|
||||
}
|
||||
|
||||
/// Safekeeper settings.
|
||||
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
|
||||
pub struct Safekeeper {
|
||||
/// Hostname of the safekeeper.
|
||||
pub host: Host,
|
||||
|
||||
/// Port that the safekeeper listens on.
|
||||
pub port: u16,
|
||||
}
|
||||
|
||||
impl From<&Safekeeper> for postgres::Config {
|
||||
fn from(sk: &Safekeeper) -> Self {
|
||||
let mut config = postgres::Config::new();
|
||||
|
||||
config.host(&sk.host.to_string());
|
||||
config.port(sk.port);
|
||||
|
||||
config
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&Safekeeper> for tokio_postgres::Config {
|
||||
fn from(sk: &Safekeeper) -> Self {
|
||||
let mut config = tokio_postgres::Config::new();
|
||||
|
||||
config.host(&sk.host.to_string());
|
||||
config.port(sk.port);
|
||||
|
||||
config
|
||||
}
|
||||
}
|
||||
|
||||
fn default_reconfigure_concurrency() -> usize {
|
||||
1
|
||||
}
|
||||
|
||||
/// Cluster spec or configuration represented as an optional number of
|
||||
/// delta operations + final cluster state description.
|
||||
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
pub struct ComputeSpec {
|
||||
pub format_version: f32,
|
||||
|
||||
@@ -90,25 +155,13 @@ pub struct ComputeSpec {
|
||||
|
||||
// Information needed to connect to the storage layer.
|
||||
//
|
||||
// `tenant_id`, `timeline_id` and `pageserver_connstring` are always needed.
|
||||
//
|
||||
// Depending on `mode`, this can be a primary read-write node, a read-only
|
||||
// replica, or a read-only node pinned at an older LSN.
|
||||
// `safekeeper_connstrings` must be set for a primary.
|
||||
//
|
||||
// For backwards compatibility, the control plane may leave out all of
|
||||
// these, and instead set the "neon.tenant_id", "neon.timeline_id",
|
||||
// etc. GUCs in cluster.settings. TODO: Once the control plane has been
|
||||
// updated to fill these fields, we can make these non optional.
|
||||
pub tenant_id: Option<TenantId>,
|
||||
pub timeline_id: Option<TimelineId>,
|
||||
pub pageserver_connstring: Option<String>,
|
||||
pub pageservers: Vec<Pageserver>,
|
||||
|
||||
// More neon ids that we expose to the compute_ctl
|
||||
// and to postgres as neon extension GUCs.
|
||||
pub project_id: Option<String>,
|
||||
pub branch_id: Option<String>,
|
||||
pub endpoint_id: Option<String>,
|
||||
#[serde(default)]
|
||||
pub safekeepers_generation: Option<u32>,
|
||||
|
||||
/// Safekeeper membership config generation. It is put in
|
||||
/// neon.safekeepers GUC and serves two purposes:
|
||||
@@ -120,9 +173,18 @@ pub struct ComputeSpec {
|
||||
/// Note: it could be SafekeeperGeneration, but this needs linking
|
||||
/// compute_ctl with postgres_ffi.
|
||||
#[serde(default)]
|
||||
pub safekeepers_generation: Option<u32>,
|
||||
#[serde(default)]
|
||||
pub safekeeper_connstrings: Vec<String>,
|
||||
pub safekeepers: Vec<Safekeeper>,
|
||||
|
||||
/// The Neon tenant ID. Exposed to Postgres as `neon.tenant_id`.
|
||||
pub tenant_id: TenantId,
|
||||
/// The Neon timeline ID. Exposed to Postgres as `neon.timeline_id`.
|
||||
pub timeline_id: TimelineId,
|
||||
/// The Neon project ID. Exposed to Postgres as `neon.project_id`.
|
||||
pub project_id: ProjectId,
|
||||
/// The Neon branch ID. Exposed to Postgres as `neon.branch_id`.
|
||||
pub branch_id: BranchId,
|
||||
/// The Neon endpoint ID. Exposed to Postgres as `neon.endpoint_id`.
|
||||
pub endpoint_id: EndpointId,
|
||||
|
||||
#[serde(default)]
|
||||
pub mode: ComputeMode,
|
||||
|
||||
@@ -295,7 +295,11 @@ pub struct TenantId(Id);
|
||||
|
||||
id_newtype!(TenantId);
|
||||
|
||||
/// If needed, reuse small string from proxy/src/types.rc
|
||||
/// Type representing a project ID.
|
||||
pub type ProjectId = String;
|
||||
/// Type representing a branch ID.
|
||||
pub type BranchId = String;
|
||||
/// Type representing an endpoint ID.
|
||||
pub type EndpointId = String;
|
||||
|
||||
// A pair uniquely identifying Neon instance.
|
||||
|
||||
@@ -65,8 +65,9 @@ diesel-async = { version = "0.5.2", features = ["postgres", "bb8", "async-connec
|
||||
diesel_migrations = { version = "2.2.0" }
|
||||
scoped-futures = "0.1.4"
|
||||
|
||||
compute_api = { path = "../libs/compute_api/" }
|
||||
http-utils = { path = "../libs/http-utils/" }
|
||||
utils = { path = "../libs/utils/" }
|
||||
metrics = { path = "../libs/metrics/" }
|
||||
control_plane = { path = "../control_plane" }
|
||||
workspace_hack = { version = "0.1", path = "../workspace_hack" }
|
||||
workspace_hack = { version = "0.1", path = "../workspace_hack" }
|
||||
|
||||
@@ -428,7 +428,10 @@ impl ComputeHook {
|
||||
.expect("Unknown pageserver");
|
||||
let (pg_host, pg_port) = parse_host_port(&ps_conf.listen_pg_addr)
|
||||
.expect("Unable to parse listen_pg_addr");
|
||||
(pg_host, pg_port.unwrap_or(5432))
|
||||
compute_api::spec::Pageserver {
|
||||
host: pg_host,
|
||||
port: pg_port.unwrap_or(5432),
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user