Compare commits

...

4 Commits

Author SHA1 Message Date
Tristan Partin
d7d3fb332f Remove notion of ParsedSpec
Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-06-09 11:33:46 -05:00
Tristan Partin
c37ce9b69c Clean up implementation of ComputeNode::has_feature()
Option::is_some_and() is perfect for what this function does.

Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-06-09 11:03:51 -05:00
Tristan Partin
681edf3983 Mark compute_ctl::main as #[tokio::main]
Historically, it was not the case, but we use async code extensively
within compute_ctl, so might as well make it easy for people in the
future to add async code.

Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-06-09 11:03:51 -05:00
Tristan Partin
ab898e40b0 Move get_config() to a method of Cli
We were already basically using it as a method. All inputs to the
function were a function of the CLI arguments anyway.

Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-06-09 11:03:51 -05:00
18 changed files with 394 additions and 410 deletions

4
Cargo.lock generated
View File

@@ -1278,10 +1278,13 @@ dependencies = [
"chrono",
"indexmap 2.9.0",
"jsonwebtoken",
"postgres",
"regex",
"remote_storage",
"serde",
"serde_json",
"tokio-postgres",
"url",
"utils",
]
@@ -6747,6 +6750,7 @@ dependencies = [
"chrono",
"clap",
"clashmap",
"compute_api",
"control_plane",
"cron",
"diesel",

View File

@@ -145,28 +145,47 @@ impl Cli {
}
}
fn main() -> Result<()> {
impl Cli {
pub fn get_config(&self) -> Result<ComputeConfig> {
// First, read the config from the path if provided
if let Some(ref config) = self.config {
let file = File::open(config)?;
return Ok(serde_json::from_reader(&file)?);
}
// If the config wasn't provided in the CLI arguments, then retrieve it from
// the control plane
match get_config_from_control_plane(
self.control_plane_uri.as_ref().unwrap(),
&self.compute_id,
) {
Ok(config) => Ok(config),
Err(e) => {
error!(
"cannot get response from control plane: {}\n\
neither spec nor confirmation that compute is in the Empty state was received",
e
);
Err(e)
}
}
}
}
#[tokio::main]
async fn main() -> Result<()> {
let cli = Cli::parse();
let scenario = failpoint_support::init();
// For historical reasons, the main thread that processes the config and launches postgres
// is synchronous, but we always have this tokio runtime available and we "enter" it so
// that you can use tokio::spawn() and tokio::runtime::Handle::current().block_on(...)
// from all parts of compute_ctl.
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
let _rt_guard = runtime.enter();
runtime.block_on(init())?;
init().await?;
// enable core dumping for all child processes
setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?;
let connstr = Url::parse(&cli.connstr).context("cannot parse connstr as a URL")?;
let config = get_config(&cli)?;
let config = cli.get_config()?;
let compute_node = ComputeNode::new(
ComputeNodeParams {
@@ -191,7 +210,7 @@ fn main() -> Result<()> {
config,
)?;
let exit_code = compute_node.run()?;
let exit_code = compute_node.run().await?;
scenario.teardown();
@@ -213,28 +232,6 @@ async fn init() -> Result<()> {
Ok(())
}
fn get_config(cli: &Cli) -> Result<ComputeConfig> {
// First, read the config from the path if provided
if let Some(ref config) = cli.config {
let file = File::open(config)?;
return Ok(serde_json::from_reader(&file)?);
}
// If the config wasn't provided in the CLI arguments, then retrieve it from
// the control plane
match get_config_from_control_plane(cli.control_plane_uri.as_ref().unwrap(), &cli.compute_id) {
Ok(config) => Ok(config),
Err(e) => {
error!(
"cannot get response from control plane: {}\n\
neither spec nor confirmation that compute is in the Empty state was received",
e
);
Err(e)
}
}
}
fn deinit_and_exit(exit_code: Option<i32>) -> ! {
// Shutdown trace pipeline gracefully, so that it has a chance to send any
// pending traces before we exit. Shutting down OTEL tracing provider may

View File

@@ -15,12 +15,8 @@ use itertools::Itertools;
use nix::sys::signal::{Signal, kill};
use nix::unistd::Pid;
use once_cell::sync::Lazy;
use postgres;
use postgres::NoTls;
use postgres::error::SqlState;
use remote_storage::{DownloadError, RemotePath};
use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;
use std::os::unix::fs::{PermissionsExt, symlink};
use std::path::Path;
use std::process::{Command, Stdio};
@@ -30,9 +26,9 @@ use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::time::{Duration, Instant};
use std::{env, fs};
use tokio::spawn;
use tokio_postgres::{NoTls, error::SqlState};
use tracing::{Instrument, debug, error, info, instrument, warn};
use url::Url;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use utils::measured_stream::MeasuredReader;
@@ -144,7 +140,7 @@ pub struct ComputeState {
/// Compute spec. This can be received from the CLI or - more likely -
/// passed by the control plane with a /configure HTTP request.
pub pspec: Option<ParsedSpec>,
pub spec: Option<ComputeSpec>,
/// If the spec is passed by a /configure request, 'startup_span' is the
/// /configure request's tracing span. The main thread enters it when it
@@ -171,7 +167,7 @@ impl ComputeState {
status: ComputeStatus::Empty,
last_active: None,
error: None,
pspec: None,
spec: None,
startup_span: None,
metrics: ComputeMetrics::default(),
lfc_prewarm_state: LfcPrewarmState::default(),
@@ -203,94 +199,6 @@ impl Default for ComputeState {
}
}
#[derive(Clone, Debug)]
pub struct ParsedSpec {
pub spec: ComputeSpec,
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub pageserver_connstr: String,
pub safekeeper_connstrings: Vec<String>,
pub storage_auth_token: Option<String>,
pub endpoint_storage_addr: Option<SocketAddr>,
pub endpoint_storage_token: Option<String>,
}
impl TryFrom<ComputeSpec> for ParsedSpec {
type Error = String;
fn try_from(spec: ComputeSpec) -> Result<Self, String> {
// Extract the options from the spec file that are needed to connect to
// the storage system.
//
// For backwards-compatibility, the top-level fields in the spec file
// may be empty. In that case, we need to dig them from the GUCs in the
// cluster.settings field.
let pageserver_connstr = spec
.pageserver_connstring
.clone()
.or_else(|| spec.cluster.settings.find("neon.pageserver_connstring"))
.ok_or("pageserver connstr should be provided")?;
let safekeeper_connstrings = if spec.safekeeper_connstrings.is_empty() {
if matches!(spec.mode, ComputeMode::Primary) {
spec.cluster
.settings
.find("neon.safekeepers")
.ok_or("safekeeper connstrings should be provided")?
.split(',')
.map(|str| str.to_string())
.collect()
} else {
vec![]
}
} else {
spec.safekeeper_connstrings.clone()
};
let storage_auth_token = spec.storage_auth_token.clone();
let tenant_id: TenantId = if let Some(tenant_id) = spec.tenant_id {
tenant_id
} else {
spec.cluster
.settings
.find("neon.tenant_id")
.ok_or("tenant id should be provided")
.map(|s| TenantId::from_str(&s))?
.or(Err("invalid tenant id"))?
};
let timeline_id: TimelineId = if let Some(timeline_id) = spec.timeline_id {
timeline_id
} else {
spec.cluster
.settings
.find("neon.timeline_id")
.ok_or("timeline id should be provided")
.map(|s| TimelineId::from_str(&s))?
.or(Err("invalid timeline id"))?
};
let endpoint_storage_addr: Option<SocketAddr> = spec
.endpoint_storage_addr
.clone()
.or_else(|| spec.cluster.settings.find("neon.endpoint_storage_addr"))
.unwrap_or_default()
.parse()
.ok();
let endpoint_storage_token = spec
.endpoint_storage_token
.clone()
.or_else(|| spec.cluster.settings.find("neon.endpoint_storage_token"));
Ok(ParsedSpec {
spec,
pageserver_connstr,
safekeeper_connstrings,
storage_auth_token,
tenant_id,
timeline_id,
endpoint_storage_addr,
endpoint_storage_token,
})
}
}
/// If we are a VM, returns a [`Command`] that will run in the `neon-postgres`
/// cgroup. Otherwise returns the default `Command::new(cmd)`
///
@@ -368,10 +276,7 @@ impl ComputeNode {
tokio_conn_conf.options(&options);
let mut new_state = ComputeState::new();
if let Some(spec) = config.spec {
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
new_state.pspec = Some(pspec);
}
new_state.spec = config.spec;
Ok(ComputeNode {
params,
@@ -386,10 +291,10 @@ impl ComputeNode {
/// Top-level control flow of compute_ctl. Returns a process exit code we should
/// exit with.
pub fn run(self) -> Result<Option<i32>> {
pub async fn run(self) -> Result<Option<i32>> {
let this = Arc::new(self);
let cli_spec = this.state.lock().unwrap().pspec.clone();
let cli_spec = this.state.lock().unwrap().spec.clone();
// If this is a pooled VM, prewarm before starting HTTP server and becoming
// available for binding. Prewarming helps Postgres start quicker later,
@@ -425,7 +330,7 @@ impl ComputeNode {
// If we got a spec from the CLI already, use that. Otherwise wait for the
// control plane to pass it to us with a /configure HTTP request
let pspec = if let Some(cli_spec) = cli_spec {
let spec = if let Some(cli_spec) = cli_spec {
cli_spec
} else {
this.wait_spec()?
@@ -438,11 +343,11 @@ impl ComputeNode {
let mut vm_monitor = None;
let mut pg_process: Option<PostgresHandle> = None;
match this.start_compute(&mut pg_process) {
match this.start_compute(&mut pg_process).await {
Ok(()) => {
// Success! Launch remaining services (just vm-monitor currently)
vm_monitor =
Some(this.start_vm_monitor(pspec.spec.disable_lfc_resizing.unwrap_or(false)));
Some(this.start_vm_monitor(spec.disable_lfc_resizing.unwrap_or(false)));
}
Err(err) => {
// Something went wrong with the startup. Log it and expose the error to
@@ -487,7 +392,7 @@ impl ComputeNode {
}
// Reap the postgres process
delay_exit |= this.cleanup_after_postgres_exit()?;
delay_exit |= this.cleanup_after_postgres_exit().await?;
// If launch failed, keep serving HTTP requests for a while, so the cloud
// control plane can get the actual error.
@@ -498,7 +403,7 @@ impl ComputeNode {
Ok(exit_code)
}
pub fn wait_spec(&self) -> Result<ParsedSpec> {
pub fn wait_spec(&self) -> Result<ComputeSpec> {
info!("no compute spec provided, waiting");
let mut state = self.state.lock().unwrap();
while state.status != ComputeStatus::ConfigurationPending {
@@ -506,7 +411,7 @@ impl ComputeNode {
}
info!("got spec, continue configuration");
let spec = state.pspec.as_ref().unwrap().clone();
let spec = state.spec.as_ref().unwrap().clone();
// Record for how long we slept waiting for the spec.
let now = Utc::now();
@@ -539,7 +444,7 @@ impl ComputeNode {
///
/// Note that this is in the critical path of a compute cold start. Keep this fast.
/// Try to do things concurrently, to hide the latencies.
fn start_compute(self: &Arc<Self>, pg_handle: &mut Option<PostgresHandle>) -> Result<()> {
async fn start_compute(self: &Arc<Self>, pg_handle: &mut Option<PostgresHandle>) -> Result<()> {
let compute_state: ComputeState;
let start_compute_span;
@@ -574,18 +479,17 @@ impl ComputeNode {
compute_state = state_guard.clone()
}
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
let spec = compute_state.spec.as_ref().expect("spec must be set");
info!(
"starting compute for project {}, operation {}, tenant {}, timeline {}, project {}, branch {}, endpoint {}, features {:?}, spec.remote_extensions {:?}",
pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None"),
pspec.spec.operation_uuid.as_deref().unwrap_or("None"),
pspec.tenant_id,
pspec.timeline_id,
pspec.spec.project_id.as_deref().unwrap_or("None"),
pspec.spec.branch_id.as_deref().unwrap_or("None"),
pspec.spec.endpoint_id.as_deref().unwrap_or("None"),
pspec.spec.features,
pspec.spec.remote_extensions,
"starting compute for operation {}, tenant {}, timeline {}, project {}, branch {}, endpoint {}, features {:?}, spec.remote_extensions {:?}",
spec.operation_uuid.as_deref().unwrap_or("None"),
spec.tenant_id,
spec.timeline_id,
spec.project_id,
spec.branch_id,
spec.endpoint_id,
spec.features,
spec.remote_extensions,
);
////// PRE-STARTUP PHASE: things that need to be finished before we start the Postgres process
@@ -606,8 +510,8 @@ impl ComputeNode {
let tls_config = self.tls_config(&pspec.spec);
// If there are any remote extensions in shared_preload_libraries, start downloading them
if pspec.spec.remote_extensions.is_some() {
let (this, spec) = (self.clone(), pspec.spec.clone());
if spec.remote_extensions.is_some() {
let (this, spec) = (self.clone(), spec.clone());
pre_tasks.spawn(async move {
this.download_preload_extensions(&spec)
.in_current_span()
@@ -618,13 +522,11 @@ impl ComputeNode {
// Prepare pgdata directory. This downloads the basebackup, among other things.
{
let (this, cs) = (self.clone(), compute_state.clone());
pre_tasks.spawn_blocking_child(move || this.prepare_pgdata(&cs));
pre_tasks.spawn(async move { this.prepare_pgdata(&cs).await });
}
// Resize swap to the desired size if the compute spec says so
if let (Some(size_bytes), true) =
(pspec.spec.swap_size_bytes, self.params.resize_swap_on_bind)
{
if let (Some(size_bytes), true) = (spec.swap_size_bytes, self.params.resize_swap_on_bind) {
pre_tasks.spawn_blocking_child(move || {
// To avoid 'swapoff' hitting postgres startup, we need to run resize-swap to completion
// *before* starting postgres.
@@ -642,7 +544,7 @@ impl ComputeNode {
// Set disk quota if the compute spec says so
if let (Some(disk_quota_bytes), Some(disk_quota_fs_mountpoint)) = (
pspec.spec.disk_quota_bytes,
spec.disk_quota_bytes,
self.params.set_disk_quota_for_fs.as_ref(),
) {
let disk_quota_fs_mountpoint = disk_quota_fs_mountpoint.clone();
@@ -657,7 +559,7 @@ impl ComputeNode {
}
// tune pgbouncer
if let Some(pgbouncer_settings) = &pspec.spec.pgbouncer_settings {
if let Some(pgbouncer_settings) = &spec.pgbouncer_settings {
info!("tuning pgbouncer");
let pgbouncer_settings = pgbouncer_settings.clone();
@@ -675,7 +577,7 @@ impl ComputeNode {
}
// configure local_proxy
if let Some(local_proxy) = &pspec.spec.local_proxy_config {
if let Some(local_proxy) = &spec.local_proxy_config {
info!("configuring local_proxy");
// Spawn a background task to do the configuration,
@@ -693,7 +595,7 @@ impl ComputeNode {
}
// Configure and start rsyslog for compliance audit logging
match pspec.spec.audit_log_level {
match spec.audit_log_level {
ComputeAudit::Hipaa | ComputeAudit::Extended | ComputeAudit::Full => {
let remote_endpoint =
std::env::var("AUDIT_LOGGING_ENDPOINT").unwrap_or("".to_string());
@@ -704,16 +606,10 @@ impl ComputeNode {
let log_directory_path = Path::new(&self.params.pgdata).join("log");
let log_directory_path = log_directory_path.to_string_lossy().to_string();
// Add project_id,endpoint_id to identify the logs.
//
// These ids are passed from cplane,
let endpoint_id = pspec.spec.endpoint_id.as_deref().unwrap_or("");
let project_id = pspec.spec.project_id.as_deref().unwrap_or("");
configure_audit_rsyslog(
log_directory_path.clone(),
endpoint_id,
project_id,
&spec.endpoint_id,
&spec.project_id,
&remote_endpoint,
)?;
@@ -724,7 +620,7 @@ impl ComputeNode {
}
// Configure and start rsyslog for Postgres logs export
let conf = PostgresLogsRsyslogConfig::new(pspec.spec.logs_export_host.as_deref());
let conf = PostgresLogsRsyslogConfig::new(spec.logs_export_host.as_deref());
configure_postgres_logs_export(conf)?;
// Launch remaining service threads
@@ -732,21 +628,20 @@ impl ComputeNode {
let _configurator_handle = launch_configurator(self);
// Wait for all the pre-tasks to finish before starting postgres
let rt = tokio::runtime::Handle::current();
while let Some(res) = rt.block_on(pre_tasks.join_next()) {
while let Some(res) = pre_tasks.join_next().await {
res??;
}
////// START POSTGRES
let start_time = Utc::now();
let pg_process = self.start_postgres(pspec.storage_auth_token.clone())?;
let pg_process = self.start_postgres(spec.storage_auth_token.clone())?;
let postmaster_pid = pg_process.pid();
*pg_handle = Some(pg_process);
// If this is a primary endpoint, perform some post-startup configuration before
// opening it up for the world.
let config_time = Utc::now();
if pspec.spec.mode == ComputeMode::Primary {
if spec.mode == ComputeMode::Primary {
self.configure_as_primary(&compute_state)?;
let conf = self.get_tokio_conn_conf(None);
@@ -787,6 +682,7 @@ impl ComputeNode {
if pspec.spec.autoprewarm {
self.prewarm_lfc();
}
Ok(())
}
@@ -862,14 +758,14 @@ impl ComputeNode {
}
}
fn cleanup_after_postgres_exit(&self) -> Result<bool> {
async fn cleanup_after_postgres_exit(&self) -> Result<bool> {
// Maybe sync safekeepers again, to speed up next startup
let compute_state = self.state.lock().unwrap().clone();
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
if matches!(pspec.spec.mode, compute_api::spec::ComputeMode::Primary) {
let spec = compute_state.spec.as_ref().expect("spec must be set");
if matches!(spec.mode, compute_api::spec::ComputeMode::Primary) {
info!("syncing safekeepers on shutdown");
let storage_auth_token = pspec.storage_auth_token.clone();
let lsn = self.sync_safekeepers(storage_auth_token)?;
let storage_auth_token = spec.storage_auth_token.clone();
let lsn = self.sync_safekeepers(storage_auth_token).await?;
info!("synced safekeepers at lsn {lsn}");
}
@@ -892,13 +788,12 @@ impl ComputeNode {
/// Check that compute node has corresponding feature enabled.
pub fn has_feature(&self, feature: ComputeFeature) -> bool {
let state = self.state.lock().unwrap();
if let Some(s) = state.pspec.as_ref() {
s.spec.features.contains(&feature)
} else {
false
}
self.state
.lock()
.unwrap()
.spec
.as_ref()
.is_some_and(|spec| spec.features.contains(&feature))
}
pub fn set_status(&self, status: ComputeStatus) {
@@ -915,13 +810,15 @@ impl ComputeNode {
self.state.lock().unwrap().status
}
pub fn get_timeline_id(&self) -> Option<TimelineId> {
pub fn get_timeline_id(&self) -> TimelineId {
self.state
.lock()
.unwrap()
.pspec
.spec
.as_ref()
.map(|s| s.timeline_id)
.unwrap()
.timeline_id
.clone()
}
// Remove `pgdata` directory and create it again with right permissions.
@@ -940,11 +837,10 @@ impl ComputeNode {
// unarchive it to `pgdata` directory overriding all its previous content.
#[instrument(skip_all, fields(%lsn))]
fn try_get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
let spec = compute_state.pspec.as_ref().expect("spec must be set");
let spec = compute_state.spec.as_ref().expect("spec must be set");
let start_time = Instant::now();
let shard0_connstr = spec.pageserver_connstr.split(',').next().unwrap();
let mut config = postgres::Config::from_str(shard0_connstr)?;
let mut config = postgres::Config::from(&spec.pageservers[0]);
// Use the storage auth token from the config file, if given.
// Note: this overrides any password set in the connection string.
@@ -956,20 +852,17 @@ impl ComputeNode {
}
config.application_name("compute_ctl");
if let Some(spec) = &compute_state.pspec {
config.options(&format!(
"-c neon.compute_mode={}",
spec.spec.mode.to_type_str()
));
if let Some(spec) = &compute_state.spec {
config.options(&format!("-c neon.compute_mode={}", spec.mode.to_type_str()));
}
// Connect to pageserver
let mut client = config.connect(NoTls)?;
let mut client = config.connect(postgres::NoTls)?;
let pageserver_connect_micros = start_time.elapsed().as_micros() as u64;
let basebackup_cmd = match lsn {
Lsn(0) => {
if spec.spec.mode != ComputeMode::Primary {
if spec.mode != ComputeMode::Primary {
format!(
"basebackup {} {} --gzip --replica",
spec.tenant_id, spec.timeline_id
@@ -979,7 +872,7 @@ impl ComputeNode {
}
}
_ => {
if spec.spec.mode != ComputeMode::Primary {
if spec.mode != ComputeMode::Primary {
format!(
"basebackup {} {} {} --gzip --replica",
spec.tenant_id, spec.timeline_id, lsn
@@ -1055,35 +948,34 @@ impl ComputeNode {
compute_state: &ComputeState,
) -> Result<Option<Lsn>> {
// Construct a connection config for each safekeeper
let pspec: ParsedSpec = compute_state
.pspec
let spec = compute_state
.spec
.as_ref()
.expect("spec must be set")
.clone();
let sk_connstrs: Vec<String> = pspec.safekeeper_connstrings.clone();
let sk_configs = sk_connstrs.into_iter().map(|connstr| {
// Format connstr
let id = connstr.clone();
let connstr = format!("postgresql://no_user@{}", connstr);
let options = format!(
"-c timeline_id={} tenant_id={}",
pspec.timeline_id, pspec.tenant_id
);
let safekeepers = spec
.safekeepers
.iter()
.map(|s| {
let mut config = tokio_postgres::Config::from(s);
// Construct client
let mut config = tokio_postgres::Config::from_str(&connstr).unwrap();
config.options(&options);
if let Some(storage_auth_token) = pspec.storage_auth_token.clone() {
config.password(storage_auth_token);
}
config.user("no_user");
config.options(&format!(
"-c timeline_id={} tenant_id={}",
spec.timeline_id, spec.tenant_id
));
if let Some(storage_auth_token) = &spec.storage_auth_token {
config.password(storage_auth_token);
}
(id, config)
});
(format!("{}:{}", s.host, s.port), config)
})
.collect::<Vec<(String, tokio_postgres::Config)>>();
// Create task set to query all safekeepers
let mut tasks = FuturesUnordered::new();
let quorum = sk_configs.len() / 2 + 1;
for (id, config) in sk_configs {
let quorum = safekeepers.len() / 2 + 1;
for (id, config) in safekeepers {
let timeout = tokio::time::Duration::from_millis(100);
let task = tokio::time::timeout(timeout, ping_safekeeper(id, config));
tasks.push(tokio::spawn(task));
@@ -1128,11 +1020,13 @@ impl ComputeNode {
// Fast path for sync_safekeepers. If they're already synced we get the lsn
// in one roundtrip. If not, we should do a full sync_safekeepers.
#[instrument(skip_all)]
pub fn check_safekeepers_synced(&self, compute_state: &ComputeState) -> Result<Option<Lsn>> {
pub async fn check_safekeepers_synced(
&self,
compute_state: &ComputeState,
) -> Result<Option<Lsn>> {
let start_time = Utc::now();
let rt = tokio::runtime::Handle::current();
let result = rt.block_on(self.check_safekeepers_synced_async(compute_state));
let result = self.check_safekeepers_synced_async(compute_state).await;
// Record runtime
self.state.lock().unwrap().metrics.sync_sk_check_ms = Utc::now()
@@ -1146,7 +1040,7 @@ impl ComputeNode {
// Run `postgres` in a special mode with `--sync-safekeepers` argument
// and return the reported LSN back to the caller.
#[instrument(skip_all)]
pub fn sync_safekeepers(&self, storage_auth_token: Option<String>) -> Result<Lsn> {
pub async fn sync_safekeepers(&self, storage_auth_token: Option<String>) -> Result<Lsn> {
let start_time = Utc::now();
let mut sync_handle = maybe_cgexec(&self.params.pgbin)
@@ -1178,8 +1072,8 @@ impl ComputeNode {
SYNC_SAFEKEEPERS_PID.store(0, Ordering::SeqCst);
// Process has exited, so we can join the logs thread.
let _ = tokio::runtime::Handle::current()
.block_on(logs_handle)
let _ = logs_handle
.await
.map_err(|e| tracing::error!("log task panicked: {:?}", e));
if !sync_output.status.success() {
@@ -1205,9 +1099,8 @@ impl ComputeNode {
/// Do all the preparations like PGDATA directory creation, configuration,
/// safekeepers sync, basebackup, etc.
#[instrument(skip_all)]
pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> {
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
let spec = &pspec.spec;
pub async fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> {
let spec = compute_state.spec.as_ref().expect("spec must be set");
let pgdata_path = Path::new(&self.params.pgdata);
let tls_config = self.tls_config(&pspec.spec);
@@ -1216,7 +1109,7 @@ impl ComputeNode {
self.create_pgdata()?;
config::write_postgres_conf(
pgdata_path,
&pspec.spec,
spec,
self.params.internal_http_port,
tls_config,
)?;
@@ -1227,11 +1120,13 @@ impl ComputeNode {
let lsn = match spec.mode {
ComputeMode::Primary => {
info!("checking if safekeepers are synced");
let lsn = if let Ok(Some(lsn)) = self.check_safekeepers_synced(compute_state) {
let lsn = if let Ok(Some(lsn)) = self.check_safekeepers_synced(compute_state).await
{
lsn
} else {
info!("starting safekeepers syncing");
self.sync_safekeepers(pspec.storage_auth_token.clone())
self.sync_safekeepers(spec.storage_auth_token.clone())
.await
.with_context(|| "failed to sync safekeepers")?
};
info!("safekeepers synced at LSN {}", lsn);
@@ -1248,13 +1143,13 @@ impl ComputeNode {
};
info!(
"getting basebackup@{} from pageserver {}",
lsn, &pspec.pageserver_connstr
"getting basebackup@{} from pageserver {}:{}",
lsn, spec.pageservers[0].host, spec.pageservers[0].port
);
self.get_basebackup(compute_state, lsn).with_context(|| {
format!(
"failed to get basebackup@{} from pageserver {}",
lsn, &pspec.pageserver_connstr
"failed to get basebackup@{} from pageserver {}:{}",
lsn, spec.pageservers[0].host, spec.pageservers[0].port
)
})?;
@@ -1536,10 +1431,9 @@ impl ComputeNode {
let conf = Arc::new(conf);
let spec = Arc::new(
compute_state
.pspec
.spec
.as_ref()
.expect("spec must be set")
.spec
.clone(),
);
@@ -1608,7 +1502,7 @@ impl ComputeNode {
/// as it's used to reconfigure a previously started and configured Postgres node.
#[instrument(skip_all)]
pub fn reconfigure(&self) -> Result<()> {
let spec = self.state.lock().unwrap().pspec.clone().unwrap().spec;
let spec = self.state.lock().unwrap().spec.as_ref().unwrap().clone();
let tls_config = self.tls_config(&spec);
@@ -1690,10 +1584,10 @@ impl ComputeNode {
#[instrument(skip_all)]
pub fn configure_as_primary(&self, compute_state: &ComputeState) -> Result<()> {
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
let spec = compute_state.spec.as_ref().expect("spec must be set");
assert!(pspec.spec.mode == ComputeMode::Primary);
if !pspec.spec.skip_pg_catalog_updates {
assert!(spec.mode == ComputeMode::Primary);
if !spec.skip_pg_catalog_updates {
let pgdata_path = Path::new(&self.params.pgdata);
// temporarily reset max_cluster_size in config
// to avoid the possibility of hitting the limit, while we are applying config:
@@ -2189,24 +2083,23 @@ LIMIT 100",
/// the pageserver connection strings has changed.
///
/// The operation will time out after a specified duration.
pub fn wait_timeout_while_pageserver_connstr_unchanged(&self, duration: Duration) {
pub fn wait_timeout_while_pageservers_unchanged(&self, duration: Duration) {
let state = self.state.lock().unwrap();
let old_pageserver_connstr = state
.pspec
let old_pageservers = state
.spec
.as_ref()
.expect("spec must be set")
.pageserver_connstr
.pageservers
.clone();
let mut unchanged = true;
let _ = self
.state_changed
.wait_timeout_while(state, duration, |s| {
let pageserver_connstr = &s
.pspec
.as_ref()
.expect("spec must be set")
.pageserver_connstr;
unchanged = pageserver_connstr == &old_pageserver_connstr;
let current_pageservers = &s.spec.as_ref().expect("spec must be set").pageservers;
unchanged = current_pageservers
.iter()
.zip(&old_pageservers)
.all(|(c, o)| c == o);
unchanged
})
.unwrap();

View File

@@ -3,6 +3,7 @@ use anyhow::{Context, Result, bail};
use async_compression::tokio::bufread::{ZstdDecoder, ZstdEncoder};
use compute_api::responses::LfcOffloadState;
use compute_api::responses::LfcPrewarmState;
use compute_api::spec::ComputeSpec;
use http::StatusCode;
use reqwest::Client;
use std::sync::Arc;
@@ -25,24 +26,30 @@ struct EndpointStoragePair {
}
const KEY: &str = "lfc_state";
impl TryFrom<&crate::compute::ParsedSpec> for EndpointStoragePair {
impl TryFrom<&ComputeSpec> for EndpointStoragePair {
type Error = anyhow::Error;
fn try_from(pspec: &crate::compute::ParsedSpec) -> Result<Self, Self::Error> {
let Some(ref endpoint_id) = pspec.spec.endpoint_id else {
bail!("pspec.endpoint_id missing")
fn try_from(spec: &ComputeSpec) -> Result<Self, Self::Error> {
let Some(ref addr) = spec.endpoint_storage_addr else {
bail!("spec.endpoint_storage_addr missing")
};
let Some(ref base_uri) = pspec.endpoint_storage_addr else {
bail!("pspec.endpoint_storage_addr missing")
};
let tenant_id = pspec.tenant_id;
let timeline_id = pspec.timeline_id;
let url = format!("http://{base_uri}/{tenant_id}/{timeline_id}/{endpoint_id}/{KEY}");
let Some(ref token) = pspec.endpoint_storage_token else {
bail!("pspec.endpoint_storage_token missing")
let url = format!(
"http://{addr}/{tenant_id}/{timeline_id}/{endpoint_id}/{key}",
addr = addr,
tenant_id = spec.tenant_id,
timeline_id = spec.timeline_id,
endpoint_id = spec.endpoint_id,
key = KEY
);
let Some(ref token) = spec.endpoint_storage_token else {
bail!("spec.endpoint_storage_token missing")
};
let token = token.clone();
Ok(EndpointStoragePair { url, token })
Ok(EndpointStoragePair {
url,
token: token.clone(),
})
}
}
@@ -111,7 +118,7 @@ impl ComputeNode {
fn endpoint_storage_pair(&self) -> Result<EndpointStoragePair> {
let state = self.state.lock().unwrap();
state.pspec.as_ref().unwrap().try_into()
state.spec.as_ref().unwrap().try_into()
}
async fn prewarm_impl(&self) -> Result<()> {

View File

@@ -56,13 +56,24 @@ pub fn write_postgres_conf(
// Add options for connecting to storage
writeln!(file, "# Neon storage settings")?;
if let Some(s) = &spec.pageserver_connstring {
writeln!(file, "neon.pageserver_connstring={}", escape_conf_value(s))?;
if !spec.pageservers.is_empty() {
writeln!(
file,
"neon.pageserver_connstring={}",
escape_conf_value(
&spec
.pageservers
.iter()
.map(|p| format!("host={} port={}", p.host, p.port))
.collect::<Vec<_>>()
.join(",")
)
)?;
}
if let Some(stripe_size) = spec.shard_stripe_size {
writeln!(file, "neon.stripe_size={stripe_size}")?;
}
if !spec.safekeeper_connstrings.is_empty() {
if !spec.safekeepers.is_empty() {
let mut neon_safekeepers_value = String::new();
tracing::info!(
"safekeepers_connstrings is not zero, gen: {:?}",
@@ -72,32 +83,45 @@ pub fn write_postgres_conf(
if let Some(generation) = spec.safekeepers_generation {
write!(neon_safekeepers_value, "g#{}:", generation)?;
}
neon_safekeepers_value.push_str(&spec.safekeeper_connstrings.join(","));
neon_safekeepers_value.push_str(
&spec
.safekeepers
.iter()
.map(|s| format!("{}:{}", s.host.to_string(), s.port))
.collect::<Vec<_>>()
.join(","),
);
writeln!(
file,
"neon.safekeepers={}",
escape_conf_value(&neon_safekeepers_value)
)?;
}
if let Some(s) = &spec.tenant_id {
writeln!(file, "neon.tenant_id={}", escape_conf_value(&s.to_string()))?;
}
if let Some(s) = &spec.timeline_id {
writeln!(
file,
"neon.timeline_id={}",
escape_conf_value(&s.to_string())
)?;
}
if let Some(s) = &spec.project_id {
writeln!(file, "neon.project_id={}", escape_conf_value(s))?;
}
if let Some(s) = &spec.branch_id {
writeln!(file, "neon.branch_id={}", escape_conf_value(s))?;
}
if let Some(s) = &spec.endpoint_id {
writeln!(file, "neon.endpoint_id={}", escape_conf_value(s))?;
}
writeln!(
file,
"neon.tenant_id={}",
escape_conf_value(&spec.tenant_id.to_string())
)?;
writeln!(
file,
"neon.timeline_id={}",
escape_conf_value(&spec.timeline_id.to_string())
)?;
writeln!(
file,
"neon.project_id={}",
escape_conf_value(&spec.project_id)
)?;
writeln!(
file,
"neon.branch_id={}",
escape_conf_value(&spec.branch_id)
)?;
writeln!(
file,
"neon.endpoint_id={}",
escape_conf_value(&spec.endpoint_id)
)?;
// tls
if let Some(tls_config) = tls_config {

View File

@@ -8,7 +8,7 @@ use http::StatusCode;
use tokio::task;
use tracing::info;
use crate::compute::{ComputeNode, ParsedSpec};
use crate::compute::ComputeNode;
use crate::http::JsonResponse;
use crate::http::extract::Json;
@@ -22,11 +22,6 @@ pub(in crate::http) async fn configure(
State(compute): State<Arc<ComputeNode>>,
request: Json<ConfigurationRequest>,
) -> Response {
let pspec = match ParsedSpec::try_from(request.0.spec) {
Ok(p) => p,
Err(e) => return JsonResponse::error(StatusCode::BAD_REQUEST, e),
};
// XXX: wrap state update under lock in a code block. Otherwise, we will try
// to `Send` `mut state` into the spawned thread bellow, which will cause
// the following rustc error:
@@ -43,7 +38,7 @@ pub(in crate::http) async fn configure(
// configure request for tracing purposes.
state.startup_span = Some(tracing::Span::current());
state.pspec = Some(pspec);
state.spec = Some(request.spec.clone());
state.set_status(ComputeStatus::ConfigurationPending, &compute.state_changed);
drop(state);
}

View File

@@ -31,8 +31,7 @@ pub(in crate::http) async fn download_extension(
let ext = {
let state = compute.state.lock().unwrap();
let pspec = state.pspec.as_ref().unwrap();
let spec = &pspec.spec;
let spec = &state.spec.as_ref().unwrap();
let remote_extensions = match spec.remote_extensions.as_ref() {
Some(r) => r,

View File

@@ -21,14 +21,8 @@ impl From<&ComputeState> for ComputeStatusResponse {
fn from(state: &ComputeState) -> Self {
ComputeStatusResponse {
start_time: state.start_time,
tenant: state
.pspec
.as_ref()
.map(|pspec| pspec.tenant_id.to_string()),
timeline: state
.pspec
.as_ref()
.map(|pspec| pspec.timeline_id.to_string()),
tenant: state.spec.as_ref().map(|spec| spec.tenant_id.to_string()),
timeline: state.spec.as_ref().map(|spec| spec.timeline_id.to_string()),
status: state.status,
last_active: state.last_active,
error: state.error.clone(),

View File

@@ -18,8 +18,8 @@ use crate::compute::ComputeNode;
pub fn launch_lsn_lease_bg_task_for_static(compute: &Arc<ComputeNode>) {
let (tenant_id, timeline_id, lsn) = {
let state = compute.state.lock().unwrap();
let spec = state.pspec.as_ref().expect("Spec must be set");
match spec.spec.mode {
let spec = state.spec.as_ref().expect("Spec must be set");
match spec.mode {
ComputeMode::Static(lsn) => (spec.tenant_id, spec.timeline_id, lsn),
_ => return,
}
@@ -58,7 +58,7 @@ fn lsn_lease_bg_task(
"Request succeeded, sleeping for {} seconds",
sleep_duration.as_secs()
);
compute.wait_timeout_while_pageserver_connstr_unchanged(sleep_duration);
compute.wait_timeout_while_pageservers_unchanged(sleep_duration);
}
}
@@ -79,18 +79,11 @@ fn acquire_lsn_lease_with_retry(
let configs = {
let state = compute.state.lock().unwrap();
let spec = state.pspec.as_ref().expect("spec must be set");
let spec = state.spec.as_ref().expect("spec must be set");
let conn_strings = spec.pageserver_connstr.split(',');
conn_strings
.map(|connstr| {
let mut config = postgres::Config::from_str(connstr).expect("Invalid connstr");
if let Some(storage_auth_token) = &spec.storage_auth_token {
config.password(storage_auth_token.clone());
}
config
})
spec.pageservers
.iter()
.map(|p| postgres::Config::from(p))
.collect::<Vec<_>>()
};
@@ -105,7 +98,7 @@ fn acquire_lsn_lease_with_retry(
Err(e) => {
warn!("Failed to acquire lsn lease: {e} (attempt {attempts})");
compute.wait_timeout_while_pageserver_connstr_unchanged(Duration::from_millis(
compute.wait_timeout_while_pageservers_unchanged(Duration::from_millis(
retry_period_ms as u64,
));
retry_period_ms *= 1.5;

View File

@@ -4,7 +4,7 @@ use std::future::Future;
use std::iter::{empty, once};
use std::sync::Arc;
use anyhow::{Context, Result};
use anyhow::Result;
use compute_api::responses::ComputeStatus;
use compute_api::spec::{ComputeAudit, ComputeSpec, Database, PgIdent, Role};
use futures::future::join_all;
@@ -74,7 +74,7 @@ impl ComputeNode {
let mut drop_subscriptions_done = false;
if spec.drop_subscriptions_before_start {
let timeline_id = self.get_timeline_id().context("timeline_id must be set")?;
let timeline_id = self.get_timeline_id();
info!("Checking if drop subscription operation was already performed for timeline_id: {}", timeline_id);

View File

@@ -37,7 +37,7 @@ pub async fn ping_safekeeper(
// Parse result
info!("done with {}", id);
if let postgres::SimpleQueryMessage::Row(row) = &result[0] {
if let tokio_postgres::SimpleQueryMessage::Row(row) = &result[0] {
use std::str::FromStr;
let response = TimelineStatusResponse::Ok(TimelineStatusOkResponse {
flush_lsn: Lsn::from_str(row.get("flush_lsn").unwrap())?,

View File

@@ -1493,7 +1493,10 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
let conf = env.get_pageserver_conf(pageserver_id).unwrap();
let parsed = parse_host_port(&conf.listen_pg_addr).expect("Bad config");
(
vec![(parsed.0, parsed.1.unwrap_or(5432))],
vec![compute_api::spec::Pageserver {
host: parsed.0,
port: parsed.1.unwrap_or(5432),
}],
// If caller is telling us what pageserver to use, this is not a tenant which is
// full managed by storage controller, therefore not sharded.
DEFAULT_STRIPE_SIZE,
@@ -1516,11 +1519,11 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
.await?;
}
anyhow::Ok((
Host::parse(&shard.listen_pg_addr)
anyhow::Ok(compute_api::spec::Pageserver {
host: Host::parse(&shard.listen_pg_addr)
.expect("Storage controller reported bad hostname"),
shard.listen_pg_port,
))
port: shard.listen_pg_port,
})
}),
)
.await?;
@@ -1576,10 +1579,10 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
let pageservers = if let Some(ps_id) = args.endpoint_pageserver_id {
let pageserver = PageServerNode::from_env(env, env.get_pageserver_conf(ps_id)?);
vec![(
pageserver.pg_connection_config.host().clone(),
pageserver.pg_connection_config.port(),
)]
vec![compute_api::spec::Pageserver {
host: pageserver.pg_connection_config.host().clone(),
port: pageserver.pg_connection_config.port(),
}]
} else {
let storage_controller = StorageController::from_env(env);
storage_controller
@@ -1587,12 +1590,10 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
.await?
.shards
.into_iter()
.map(|shard| {
(
Host::parse(&shard.listen_pg_addr)
.expect("Storage controller reported malformed host"),
shard.listen_pg_port,
)
.map(|shard| compute_api::spec::Pageserver {
host: Host::parse(&shard.listen_pg_addr)
.expect("Storage controller reported malformed host"),
port: shard.listen_pg_port,
})
.collect::<Vec<_>>()
};

View File

@@ -52,8 +52,8 @@ use compute_api::responses::{
ComputeConfig, ComputeCtlConfig, ComputeStatus, ComputeStatusResponse, TlsConfig,
};
use compute_api::spec::{
Cluster, ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, Database, PgIdent,
RemoteExtSpec, Role,
Cluster, ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, Database, Pageserver, PgIdent,
RemoteExtSpec, Role, Safekeeper,
};
use jsonwebtoken::jwk::{
AlgorithmParameters, CommonParameters, EllipticCurve, Jwk, JwkSet, KeyAlgorithm, KeyOperations,
@@ -606,29 +606,25 @@ impl Endpoint {
}
}
fn build_pageserver_connstr(pageservers: &[(Host, u16)]) -> String {
pageservers
.iter()
.map(|(host, port)| format!("postgresql://no_user@{host}:{port}"))
.collect::<Vec<_>>()
.join(",")
}
fn safekeepers_from_nodes(&self, ids: Vec<NodeId>) -> Result<Vec<Safekeeper>> {
let mut s = Vec::new();
/// Map safekeepers ids to the actual connection strings.
fn build_safekeepers_connstrs(&self, sk_ids: Vec<NodeId>) -> Result<Vec<String>> {
let mut safekeeper_connstrings = Vec::new();
if self.mode == ComputeMode::Primary {
for sk_id in sk_ids {
for id in ids {
let sk = self
.env
.safekeepers
.iter()
.find(|node| node.id == sk_id)
.ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?;
safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.get_compute_port()));
.find(|node| node.id == id)
.ok_or_else(|| anyhow!("safekeeper {id} does not exist"))?;
s.push(Safekeeper {
host: Host::parse("127.0.0.1")?,
port: sk.get_compute_port(),
});
}
}
Ok(safekeeper_connstrings)
Ok(s)
}
/// Generate a JWT with the correct claims.
@@ -654,7 +650,7 @@ impl Endpoint {
endpoint_storage_addr: String,
safekeepers_generation: Option<SafekeeperGeneration>,
safekeepers: Vec<NodeId>,
pageservers: Vec<(Host, u16)>,
pageservers: Vec<Pageserver>,
remote_ext_base_url: Option<&String>,
shard_stripe_size: usize,
create_test_user: bool,
@@ -672,11 +668,6 @@ impl Endpoint {
std::fs::remove_dir_all(self.pgdata())?;
}
let pageserver_connstring = Self::build_pageserver_connstr(&pageservers);
assert!(!pageserver_connstring.is_empty());
let safekeeper_connstrings = self.build_safekeepers_connstrs(safekeepers)?;
// check for file remote_extensions_spec.json
// if it is present, read it and pass to compute_ctl
let remote_extensions_spec_path = self.endpoint_path().join("remote_extensions_spec.json");
@@ -727,15 +718,34 @@ impl Endpoint {
postgresql_conf: Some(postgresql_conf.clone()),
},
delta_operations: None,
tenant_id: Some(self.tenant_id),
timeline_id: Some(self.timeline_id),
project_id: None,
branch_id: None,
endpoint_id: Some(self.endpoint_id.clone()),
tenant_id: self.tenant_id.clone(),
timeline_id: self.timeline_id.clone(),
project_id: self.tenant_id.to_string(),
branch_id: self.timeline_id.to_string(),
endpoint_id: self.endpoint_id.clone(),
mode: self.mode,
pageserver_connstring: Some(pageserver_connstring),
pageservers,
safekeepers: {
let mut s = Vec::new();
if self.mode == ComputeMode::Primary {
for id in safekeepers {
let sk = self
.env
.safekeepers
.iter()
.find(|node| node.id == id)
.ok_or_else(|| anyhow!("safekeeper {id} does not exist"))?;
s.push(Safekeeper {
host: Host::parse("127.0.0.1")?,
port: sk.get_compute_port(),
});
}
}
s
},
safekeepers_generation: safekeepers_generation.map(|g| g.into_inner()),
safekeeper_connstrings,
storage_auth_token: auth_token.clone(),
remote_extensions,
pgbouncer_settings: None,
@@ -939,7 +949,7 @@ impl Endpoint {
pub async fn reconfigure(
&self,
mut pageservers: Vec<(Host, u16)>,
pageservers: Vec<Pageserver>,
stripe_size: Option<ShardStripeSize>,
safekeepers: Option<Vec<NodeId>>,
) -> Result<()> {
@@ -958,30 +968,24 @@ impl Endpoint {
if pageservers.is_empty() {
let storage_controller = StorageController::from_env(&self.env);
let locate_result = storage_controller.tenant_locate(self.tenant_id).await?;
pageservers = locate_result
spec.pageservers = locate_result
.shards
.into_iter()
.map(|shard| {
(
Host::parse(&shard.listen_pg_addr)
.expect("Storage controller reported bad hostname"),
shard.listen_pg_port,
)
.map(|shard| Pageserver {
host: Host::parse(&shard.listen_pg_addr)
.expect("Storage controller reported bad hostname"),
port: shard.listen_pg_port,
})
.collect::<Vec<_>>();
}
let pageserver_connstr = Self::build_pageserver_connstr(&pageservers);
assert!(!pageserver_connstr.is_empty());
spec.pageserver_connstring = Some(pageserver_connstr);
if stripe_size.is_some() {
spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize);
}
// If safekeepers are not specified, don't change them.
if let Some(safekeepers) = safekeepers {
let safekeeper_connstrings = self.build_safekeepers_connstrs(safekeepers)?;
spec.safekeeper_connstrings = safekeeper_connstrings;
spec.safekeepers = self.safekeepers_from_nodes(safekeepers)?;
}
let client = reqwest::Client::builder()

View File

@@ -9,8 +9,11 @@ anyhow.workspace = true
chrono.workspace = true
indexmap.workspace = true
jsonwebtoken.workspace = true
postgres.workspace = true
serde.workspace = true
serde_json.workspace = true
tokio-postgres.workspace = true
url.workspace = true
regex.workspace = true
utils = { path = "../utils" }

View File

@@ -9,7 +9,8 @@ use indexmap::IndexMap;
use regex::Regex;
use remote_storage::RemotePath;
use serde::{Deserialize, Serialize};
use utils::id::{TenantId, TimelineId};
use url::Host;
use utils::id::{BranchId, EndpointId, ProjectId, TenantId, TimelineId};
use utils::lsn::Lsn;
use crate::responses::TlsConfig;
@@ -21,13 +22,77 @@ pub type PgIdent = String;
/// String type alias representing Postgres extension version
pub type ExtVersion = String;
/// Pageserver settings.
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct Pageserver {
/// Hostname of the pageserver.
pub host: Host,
/// Port that the safekeeper listens on.
pub port: u16,
}
impl From<&Pageserver> for postgres::Config {
fn from(ps: &Pageserver) -> Self {
let mut config = postgres::Config::new();
config.host(&ps.host.to_string());
config.port(ps.port);
config
}
}
impl From<&Pageserver> for tokio_postgres::Config {
fn from(ps: &Pageserver) -> Self {
let mut config = tokio_postgres::Config::new();
config.host(&ps.host.to_string());
config.port(ps.port);
config
}
}
/// Safekeeper settings.
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct Safekeeper {
/// Hostname of the safekeeper.
pub host: Host,
/// Port that the safekeeper listens on.
pub port: u16,
}
impl From<&Safekeeper> for postgres::Config {
fn from(sk: &Safekeeper) -> Self {
let mut config = postgres::Config::new();
config.host(&sk.host.to_string());
config.port(sk.port);
config
}
}
impl From<&Safekeeper> for tokio_postgres::Config {
fn from(sk: &Safekeeper) -> Self {
let mut config = tokio_postgres::Config::new();
config.host(&sk.host.to_string());
config.port(sk.port);
config
}
}
fn default_reconfigure_concurrency() -> usize {
1
}
/// Cluster spec or configuration represented as an optional number of
/// delta operations + final cluster state description.
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct ComputeSpec {
pub format_version: f32,
@@ -90,25 +155,13 @@ pub struct ComputeSpec {
// Information needed to connect to the storage layer.
//
// `tenant_id`, `timeline_id` and `pageserver_connstring` are always needed.
//
// Depending on `mode`, this can be a primary read-write node, a read-only
// replica, or a read-only node pinned at an older LSN.
// `safekeeper_connstrings` must be set for a primary.
//
// For backwards compatibility, the control plane may leave out all of
// these, and instead set the "neon.tenant_id", "neon.timeline_id",
// etc. GUCs in cluster.settings. TODO: Once the control plane has been
// updated to fill these fields, we can make these non optional.
pub tenant_id: Option<TenantId>,
pub timeline_id: Option<TimelineId>,
pub pageserver_connstring: Option<String>,
pub pageservers: Vec<Pageserver>,
// More neon ids that we expose to the compute_ctl
// and to postgres as neon extension GUCs.
pub project_id: Option<String>,
pub branch_id: Option<String>,
pub endpoint_id: Option<String>,
#[serde(default)]
pub safekeepers_generation: Option<u32>,
/// Safekeeper membership config generation. It is put in
/// neon.safekeepers GUC and serves two purposes:
@@ -120,9 +173,18 @@ pub struct ComputeSpec {
/// Note: it could be SafekeeperGeneration, but this needs linking
/// compute_ctl with postgres_ffi.
#[serde(default)]
pub safekeepers_generation: Option<u32>,
#[serde(default)]
pub safekeeper_connstrings: Vec<String>,
pub safekeepers: Vec<Safekeeper>,
/// The Neon tenant ID. Exposed to Postgres as `neon.tenant_id`.
pub tenant_id: TenantId,
/// The Neon timeline ID. Exposed to Postgres as `neon.timeline_id`.
pub timeline_id: TimelineId,
/// The Neon project ID. Exposed to Postgres as `neon.project_id`.
pub project_id: ProjectId,
/// The Neon branch ID. Exposed to Postgres as `neon.branch_id`.
pub branch_id: BranchId,
/// The Neon endpoint ID. Exposed to Postgres as `neon.endpoint_id`.
pub endpoint_id: EndpointId,
#[serde(default)]
pub mode: ComputeMode,

View File

@@ -295,7 +295,11 @@ pub struct TenantId(Id);
id_newtype!(TenantId);
/// If needed, reuse small string from proxy/src/types.rc
/// Type representing a project ID.
pub type ProjectId = String;
/// Type representing a branch ID.
pub type BranchId = String;
/// Type representing an endpoint ID.
pub type EndpointId = String;
// A pair uniquely identifying Neon instance.

View File

@@ -65,8 +65,9 @@ diesel-async = { version = "0.5.2", features = ["postgres", "bb8", "async-connec
diesel_migrations = { version = "2.2.0" }
scoped-futures = "0.1.4"
compute_api = { path = "../libs/compute_api/" }
http-utils = { path = "../libs/http-utils/" }
utils = { path = "../libs/utils/" }
metrics = { path = "../libs/metrics/" }
control_plane = { path = "../control_plane" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }

View File

@@ -428,7 +428,10 @@ impl ComputeHook {
.expect("Unknown pageserver");
let (pg_host, pg_port) = parse_host_port(&ps_conf.listen_pg_addr)
.expect("Unable to parse listen_pg_addr");
(pg_host, pg_port.unwrap_or(5432))
compute_api::spec::Pageserver {
host: pg_host,
port: pg_port.unwrap_or(5432),
}
})
.collect::<Vec<_>>();