Consolidate compute_ctl configuration structures (#11514)

Previously, the structure of the spec file was just the compute spec.
However, the response from the control plane get spec request included
the compute spec and the compute_ctl config. This divergence was
hindering other work such as adding regression tests for compute_ctl
HTTP authorization.

Signed-off-by: Tristan Partin <tristan@neon.tech>
This commit is contained in:
Tristan Partin
2025-04-11 10:06:29 -05:00
committed by GitHub
parent c66444ea15
commit ff5a527167
14 changed files with 491 additions and 412 deletions

View File

@@ -29,13 +29,12 @@
//! ```sh
//! compute_ctl -D /var/db/postgres/compute \
//! -C 'postgresql://cloud_admin@localhost/postgres' \
//! -S /var/db/postgres/specs/current.json \
//! -c /var/db/postgres/configs/config.json \
//! -b /usr/local/bin/postgres \
//! -r http://pg-ext-s3-gateway \
//! ```
use std::ffi::OsString;
use std::fs::File;
use std::path::Path;
use std::process::exit;
use std::sync::mpsc;
use std::thread;
@@ -43,8 +42,7 @@ use std::time::Duration;
use anyhow::{Context, Result};
use clap::Parser;
use compute_api::responses::ComputeCtlConfig;
use compute_api::spec::ComputeSpec;
use compute_api::responses::ComputeConfig;
use compute_tools::compute::{
BUILD_TAG, ComputeNode, ComputeNodeParams, forward_termination_signal,
};
@@ -118,8 +116,10 @@ struct Cli {
#[arg(long)]
pub set_disk_quota_for_fs: Option<String>,
#[arg(short = 'S', long, group = "spec-path")]
pub spec_path: Option<OsString>,
// TODO(tristan957): remove alias after compatibility tests are no longer
// an issue
#[arg(short = 'c', long, alias = "spec-path")]
pub config: Option<OsString>,
#[arg(short = 'i', long, group = "compute-id")]
pub compute_id: String,
@@ -127,8 +127,9 @@ struct Cli {
#[arg(
short = 'p',
long,
conflicts_with = "spec-path",
value_name = "CONTROL_PLANE_API_BASE_URL"
conflicts_with = "config",
value_name = "CONTROL_PLANE_API_BASE_URL",
requires = "compute-id"
)]
pub control_plane_uri: Option<String>,
}
@@ -154,7 +155,7 @@ fn main() -> Result<()> {
let connstr = Url::parse(&cli.connstr).context("cannot parse connstr as a URL")?;
let cli_spec = try_spec_from_cli(&cli)?;
let cli_spec = get_config(&cli)?;
let compute_node = ComputeNode::new(
ComputeNodeParams {
@@ -201,27 +202,17 @@ async fn init() -> Result<()> {
Ok(())
}
fn try_spec_from_cli(cli: &Cli) -> Result<CliSpecParams> {
// First, read spec from the path if provided
if let Some(ref spec_path) = cli.spec_path {
let file = File::open(Path::new(spec_path))?;
return Ok(CliSpecParams {
spec: Some(serde_json::from_reader(file)?),
compute_ctl_config: ComputeCtlConfig::default(),
});
fn get_config(cli: &Cli) -> Result<ComputeConfig> {
// First, read the config from the path if provided
if let Some(ref config) = cli.config {
let file = File::open(config)?;
return Ok(serde_json::from_reader(&file)?);
}
if cli.control_plane_uri.is_none() {
panic!("must specify --control-plane-uri");
};
// If the spec wasn't provided in the CLI arguments, then retrieve it from
// If the config wasn't provided in the CLI arguments, then retrieve it from
// the control plane
match get_spec_from_control_plane(cli.control_plane_uri.as_ref().unwrap(), &cli.compute_id) {
Ok(resp) => Ok(CliSpecParams {
spec: resp.0,
compute_ctl_config: resp.1,
}),
match get_config_from_control_plane(cli.control_plane_uri.as_ref().unwrap(), &cli.compute_id) {
Ok(config) => Ok(config),
Err(e) => {
error!(
"cannot get response from control plane: {}\n\
@@ -233,13 +224,6 @@ fn try_spec_from_cli(cli: &Cli) -> Result<CliSpecParams> {
}
}
struct CliSpecParams {
/// If a spec was provided via CLI or file, the [`ComputeSpec`]
spec: Option<ComputeSpec>,
#[allow(dead_code)]
compute_ctl_config: ComputeCtlConfig,
}
fn deinit_and_exit(exit_code: Option<i32>) -> ! {
// Shutdown trace pipeline gracefully, so that it has a chance to send any
// pending traces before we exit. Shutting down OTEL tracing provider may

View File

@@ -19,13 +19,13 @@ pub(crate) static INSTALLED_EXTENSIONS: Lazy<UIntGaugeVec> = Lazy::new(|| {
// but for all our APIs we defined a 'slug'/method/operationId in the OpenAPI spec.
// And it's fair to call it a 'RPC' (Remote Procedure Call).
pub enum CPlaneRequestRPC {
GetSpec,
GetConfig,
}
impl CPlaneRequestRPC {
pub fn as_str(&self) -> &str {
match self {
CPlaneRequestRPC::GetSpec => "GetSpec",
CPlaneRequestRPC::GetConfig => "GetConfig",
}
}
}

View File

@@ -3,9 +3,8 @@ use std::path::Path;
use anyhow::{Result, anyhow, bail};
use compute_api::responses::{
ComputeCtlConfig, ControlPlaneComputeStatus, ControlPlaneSpecResponse,
ComputeConfig, ControlPlaneComputeStatus, ControlPlaneConfigResponse,
};
use compute_api::spec::ComputeSpec;
use reqwest::StatusCode;
use tokio_postgres::Client;
use tracing::{error, info, instrument};
@@ -21,7 +20,7 @@ use crate::params::PG_HBA_ALL_MD5;
fn do_control_plane_request(
uri: &str,
jwt: &str,
) -> Result<ControlPlaneSpecResponse, (bool, String, String)> {
) -> Result<ControlPlaneConfigResponse, (bool, String, String)> {
let resp = reqwest::blocking::Client::new()
.get(uri)
.header("Authorization", format!("Bearer {}", jwt))
@@ -29,14 +28,14 @@ fn do_control_plane_request(
.map_err(|e| {
(
true,
format!("could not perform spec request to control plane: {:?}", e),
format!("could not perform request to control plane: {:?}", e),
UNKNOWN_HTTP_STATUS.to_string(),
)
})?;
let status = resp.status();
match status {
StatusCode::OK => match resp.json::<ControlPlaneSpecResponse>() {
StatusCode::OK => match resp.json::<ControlPlaneConfigResponse>() {
Ok(spec_resp) => Ok(spec_resp),
Err(e) => Err((
true,
@@ -69,40 +68,35 @@ fn do_control_plane_request(
}
}
/// Request spec from the control-plane by compute_id. If `NEON_CONTROL_PLANE_TOKEN`
/// env variable is set, it will be used for authorization.
pub fn get_spec_from_control_plane(
base_uri: &str,
compute_id: &str,
) -> Result<(Option<ComputeSpec>, ComputeCtlConfig)> {
/// Request config from the control-plane by compute_id. If
/// `NEON_CONTROL_PLANE_TOKEN` env variable is set, it will be used for
/// authorization.
pub fn get_config_from_control_plane(base_uri: &str, compute_id: &str) -> Result<ComputeConfig> {
let cp_uri = format!("{base_uri}/compute/api/v2/computes/{compute_id}/spec");
let jwt: String = match std::env::var("NEON_CONTROL_PLANE_TOKEN") {
Ok(v) => v,
Err(_) => "".to_string(),
};
let jwt: String = std::env::var("NEON_CONTROL_PLANE_TOKEN").unwrap_or_default();
let mut attempt = 1;
info!("getting spec from control plane: {}", cp_uri);
info!("getting config from control plane: {}", cp_uri);
// Do 3 attempts to get spec from the control plane using the following logic:
// - network error -> then retry
// - compute id is unknown or any other error -> bail out
// - no spec for compute yet (Empty state) -> return Ok(None)
// - got spec -> return Ok(Some(spec))
// - got config -> return Ok(Some(config))
while attempt < 4 {
let result = match do_control_plane_request(&cp_uri, &jwt) {
Ok(spec_resp) => {
Ok(config_resp) => {
CPLANE_REQUESTS_TOTAL
.with_label_values(&[
CPlaneRequestRPC::GetSpec.as_str(),
CPlaneRequestRPC::GetConfig.as_str(),
&StatusCode::OK.to_string(),
])
.inc();
match spec_resp.status {
ControlPlaneComputeStatus::Empty => Ok((None, spec_resp.compute_ctl_config)),
match config_resp.status {
ControlPlaneComputeStatus::Empty => Ok(config_resp.into()),
ControlPlaneComputeStatus::Attached => {
if let Some(spec) = spec_resp.spec {
Ok((Some(spec), spec_resp.compute_ctl_config))
if config_resp.spec.is_some() {
Ok(config_resp.into())
} else {
bail!("compute is attached, but spec is empty")
}
@@ -111,7 +105,7 @@ pub fn get_spec_from_control_plane(
}
Err((retry, msg, status)) => {
CPLANE_REQUESTS_TOTAL
.with_label_values(&[CPlaneRequestRPC::GetSpec.as_str(), &status])
.with_label_values(&[CPlaneRequestRPC::GetConfig.as_str(), &status])
.inc();
if retry {
Err(anyhow!(msg))
@@ -122,7 +116,7 @@ pub fn get_spec_from_control_plane(
};
if let Err(e) = &result {
error!("attempt {} to get spec failed with: {}", attempt, e);
error!("attempt {} to get config failed with: {}", attempt, e);
} else {
return result;
}
@@ -133,13 +127,13 @@ pub fn get_spec_from_control_plane(
// All attempts failed, return error.
Err(anyhow::anyhow!(
"Exhausted all attempts to retrieve the spec from the control plane"
"Exhausted all attempts to retrieve the config from the control plane"
))
}
/// Check `pg_hba.conf` and update if needed to allow external connections.
pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
// XXX: consider making it a part of spec.json
// XXX: consider making it a part of config.json
let pghba_path = pgdata_path.join("pg_hba.conf");
if config::line_in_file(&pghba_path, PG_HBA_ALL_MD5)? {
@@ -153,7 +147,7 @@ pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
/// Create a standby.signal file
pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> {
// XXX: consider making it a part of spec.json
// XXX: consider making it a part of config.json
let signalfile = pgdata_path.join("standby.signal");
if !signalfile.exists() {

View File

@@ -29,7 +29,7 @@
//! compute.log - log output of `compute_ctl` and `postgres`
//! endpoint.json - serialized `EndpointConf` struct
//! postgresql.conf - postgresql settings
//! spec.json - passed to `compute_ctl`
//! config.json - passed to `compute_ctl`
//! pgdata/
//! postgresql.conf - copy of postgresql.conf created by `compute_ctl`
//! zenith.signal
@@ -46,7 +46,9 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use anyhow::{Context, Result, anyhow, bail};
use compute_api::requests::ConfigurationRequest;
use compute_api::responses::{ComputeCtlConfig, ComputeStatus, ComputeStatusResponse};
use compute_api::responses::{
ComputeConfig, ComputeCtlConfig, ComputeStatus, ComputeStatusResponse,
};
use compute_api::spec::{
Cluster, ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, Database, PgIdent,
RemoteExtSpec, Role,
@@ -619,90 +621,101 @@ impl Endpoint {
remote_extensions = None;
};
// Create spec file
let mut spec = ComputeSpec {
skip_pg_catalog_updates: self.skip_pg_catalog_updates,
format_version: 1.0,
operation_uuid: None,
features: self.features.clone(),
swap_size_bytes: None,
disk_quota_bytes: None,
disable_lfc_resizing: None,
cluster: Cluster {
cluster_id: None, // project ID: not used
name: None, // project name: not used
state: None,
roles: if create_test_user {
vec![Role {
// Create config file
let config = {
let mut spec = ComputeSpec {
skip_pg_catalog_updates: self.skip_pg_catalog_updates,
format_version: 1.0,
operation_uuid: None,
features: self.features.clone(),
swap_size_bytes: None,
disk_quota_bytes: None,
disable_lfc_resizing: None,
cluster: Cluster {
cluster_id: None, // project ID: not used
name: None, // project name: not used
state: None,
roles: if create_test_user {
vec![Role {
name: PgIdent::from_str("test").unwrap(),
encrypted_password: None,
options: None,
}]
} else {
Vec::new()
},
databases: if create_test_user {
vec![Database {
name: PgIdent::from_str("neondb").unwrap(),
owner: PgIdent::from_str("test").unwrap(),
options: None,
restrict_conn: false,
invalid: false,
}]
} else {
Vec::new()
},
settings: None,
postgresql_conf: Some(postgresql_conf.clone()),
},
delta_operations: None,
tenant_id: Some(self.tenant_id),
timeline_id: Some(self.timeline_id),
project_id: None,
branch_id: None,
endpoint_id: Some(self.endpoint_id.clone()),
mode: self.mode,
pageserver_connstring: Some(pageserver_connstring),
safekeepers_generation: safekeepers_generation.map(|g| g.into_inner()),
safekeeper_connstrings,
storage_auth_token: auth_token.clone(),
remote_extensions,
pgbouncer_settings: None,
shard_stripe_size: Some(shard_stripe_size),
local_proxy_config: None,
reconfigure_concurrency: self.reconfigure_concurrency,
drop_subscriptions_before_start: self.drop_subscriptions_before_start,
audit_log_level: ComputeAudit::Disabled,
logs_export_host: None::<String>,
};
// this strange code is needed to support respec() in tests
if self.cluster.is_some() {
debug!("Cluster is already set in the endpoint spec, using it");
spec.cluster = self.cluster.clone().unwrap();
debug!("spec.cluster {:?}", spec.cluster);
// fill missing fields again
if create_test_user {
spec.cluster.roles.push(Role {
name: PgIdent::from_str("test").unwrap(),
encrypted_password: None,
options: None,
}]
} else {
Vec::new()
},
databases: if create_test_user {
vec![Database {
});
spec.cluster.databases.push(Database {
name: PgIdent::from_str("neondb").unwrap(),
owner: PgIdent::from_str("test").unwrap(),
options: None,
restrict_conn: false,
invalid: false,
}]
} else {
Vec::new()
},
settings: None,
postgresql_conf: Some(postgresql_conf.clone()),
},
delta_operations: None,
tenant_id: Some(self.tenant_id),
timeline_id: Some(self.timeline_id),
project_id: None,
branch_id: None,
endpoint_id: Some(self.endpoint_id.clone()),
mode: self.mode,
pageserver_connstring: Some(pageserver_connstring),
safekeepers_generation: safekeepers_generation.map(|g| g.into_inner()),
safekeeper_connstrings,
storage_auth_token: auth_token.clone(),
remote_extensions,
pgbouncer_settings: None,
shard_stripe_size: Some(shard_stripe_size),
local_proxy_config: None,
reconfigure_concurrency: self.reconfigure_concurrency,
drop_subscriptions_before_start: self.drop_subscriptions_before_start,
audit_log_level: ComputeAudit::Disabled,
logs_export_host: None::<String>,
});
}
spec.cluster.postgresql_conf = Some(postgresql_conf);
}
ComputeConfig {
spec: Some(spec),
compute_ctl_config: ComputeCtlConfig::default(),
}
};
// this strange code is needed to support respec() in tests
if self.cluster.is_some() {
debug!("Cluster is already set in the endpoint spec, using it");
spec.cluster = self.cluster.clone().unwrap();
debug!("spec.cluster {:?}", spec.cluster);
// fill missing fields again
if create_test_user {
spec.cluster.roles.push(Role {
name: PgIdent::from_str("test").unwrap(),
encrypted_password: None,
options: None,
});
spec.cluster.databases.push(Database {
name: PgIdent::from_str("neondb").unwrap(),
owner: PgIdent::from_str("test").unwrap(),
options: None,
restrict_conn: false,
invalid: false,
});
}
spec.cluster.postgresql_conf = Some(postgresql_conf);
}
// TODO(tristan957): Remove the write to spec.json after compatibility
// tests work themselves out
let spec_path = self.endpoint_path().join("spec.json");
std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
std::fs::write(spec_path, serde_json::to_string_pretty(&config.spec)?)?;
let config_path = self.endpoint_path().join("config.json");
std::fs::write(config_path, serde_json::to_string_pretty(&config)?)?;
// Open log file. We'll redirect the stdout and stderr of `compute_ctl` to it.
let logfile = std::fs::OpenOptions::new()
@@ -710,6 +723,16 @@ impl Endpoint {
.append(true)
.open(self.endpoint_path().join("compute.log"))?;
// TODO(tristan957): Remove when compatibility tests are no longer an
// issue
let old_compute_ctl = {
let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl"));
let help_output = cmd.arg("--help").output()?;
let help_output = String::from_utf8_lossy(&help_output.stdout);
!help_output.contains("--config")
};
// Launch compute_ctl
let conn_str = self.connstr("cloud_admin", "postgres");
println!("Starting postgres node at '{}'", conn_str);
@@ -728,9 +751,18 @@ impl Endpoint {
])
.args(["--pgdata", self.pgdata().to_str().unwrap()])
.args(["--connstr", &conn_str])
// TODO(tristan957): Change this to --config when compatibility tests
// are no longer an issue
.args([
"--spec-path",
self.endpoint_path().join("spec.json").to_str().unwrap(),
self.endpoint_path()
.join(if old_compute_ctl {
"spec.json"
} else {
"config.json"
})
.to_str()
.unwrap(),
])
.args([
"--pgbin",
@@ -873,10 +905,12 @@ impl Endpoint {
stripe_size: Option<ShardStripeSize>,
safekeepers: Option<Vec<NodeId>>,
) -> Result<()> {
let mut spec: ComputeSpec = {
let spec_path = self.endpoint_path().join("spec.json");
let file = std::fs::File::open(spec_path)?;
serde_json::from_reader(file)?
let (mut spec, compute_ctl_config) = {
let config_path = self.endpoint_path().join("config.json");
let file = std::fs::File::open(config_path)?;
let config: ComputeConfig = serde_json::from_reader(file)?;
(config.spec.unwrap(), config.compute_ctl_config)
};
let postgresql_conf = self.read_postgresql_conf()?;
@@ -926,7 +960,7 @@ impl Endpoint {
.body(
serde_json::to_string(&ConfigurationRequest {
spec,
compute_ctl_config: ComputeCtlConfig::default(),
compute_ctl_config,
})
.unwrap(),
)

View File

@@ -11,8 +11,8 @@ generate_id() {
PG_VERSION=${PG_VERSION:-14}
SPEC_FILE_ORG=/var/db/postgres/specs/spec.json
SPEC_FILE=/tmp/spec.json
CONFIG_FILE_ORG=/var/db/postgres/configs/config.json
CONFIG_FILE=/tmp/config.json
echo "Waiting pageserver become ready."
while ! nc -z pageserver 6400; do
@@ -20,7 +20,7 @@ while ! nc -z pageserver 6400; do
done
echo "Page server is ready."
cp ${SPEC_FILE_ORG} ${SPEC_FILE}
cp ${CONFIG_FILE_ORG} ${CONFIG_FILE}
if [ -n "${TENANT_ID:-}" ] && [ -n "${TIMELINE_ID:-}" ]; then
tenant_id=${TENANT_ID}
@@ -73,17 +73,27 @@ else
ulid_extension=ulid
fi
echo "Adding pgx_ulid"
shared_libraries=$(jq -r '.cluster.settings[] | select(.name=="shared_preload_libraries").value' ${SPEC_FILE})
sed -i "s/${shared_libraries}/${shared_libraries},${ulid_extension}/" ${SPEC_FILE}
shared_libraries=$(jq -r '.spec.cluster.settings[] | select(.name=="shared_preload_libraries").value' ${CONFIG_FILE})
sed -i "s/${shared_libraries}/${shared_libraries},${ulid_extension}/" ${CONFIG_FILE}
echo "Overwrite tenant id and timeline id in spec file"
sed -i "s/TENANT_ID/${tenant_id}/" ${SPEC_FILE}
sed -i "s/TIMELINE_ID/${timeline_id}/" ${SPEC_FILE}
sed -i "s/TENANT_ID/${tenant_id}/" ${CONFIG_FILE}
sed -i "s/TIMELINE_ID/${timeline_id}/" ${CONFIG_FILE}
cat ${SPEC_FILE}
cat ${CONFIG_FILE}
# TODO(tristan957): Remove these workarounds for backwards compatibility after
# the next compute release. That includes these next few lines and the
# --spec-path in the compute_ctl invocation.
if compute_ctl --help | grep --quiet -- '--config'; then
SPEC_PATH="$CONFIG_FILE"
else
jq '.spec' < "$CONFIG_FILE" > /tmp/spec.json
SPEC_PATH=/tmp/spec.json
fi
echo "Start compute node"
/usr/local/bin/compute_ctl --pgdata /var/db/postgres/compute \
-C "postgresql://cloud_admin@localhost:55433/postgres" \
-b /usr/local/bin/postgres \
--compute-id "compute-$RANDOM" \
-S ${SPEC_FILE}
--spec-path "$SPEC_PATH"

View File

@@ -0,0 +1,148 @@
{
"spec": {
"format_version": 1.0,
"timestamp": "2022-10-12T18:00:00.000Z",
"operation_uuid": "0f657b36-4b0f-4a2d-9c2e-1dcd615e7d8c",
"cluster": {
"cluster_id": "docker_compose",
"name": "docker_compose_test",
"state": "restarted",
"roles": [
{
"name": "cloud_admin",
"encrypted_password": "b093c0d3b281ba6da1eacc608620abd8",
"options": null
}
],
"databases": [
],
"settings": [
{
"name": "fsync",
"value": "off",
"vartype": "bool"
},
{
"name": "wal_level",
"value": "logical",
"vartype": "enum"
},
{
"name": "wal_log_hints",
"value": "on",
"vartype": "bool"
},
{
"name": "log_connections",
"value": "on",
"vartype": "bool"
},
{
"name": "port",
"value": "55433",
"vartype": "integer"
},
{
"name": "shared_buffers",
"value": "1MB",
"vartype": "string"
},
{
"name": "max_connections",
"value": "100",
"vartype": "integer"
},
{
"name": "listen_addresses",
"value": "0.0.0.0",
"vartype": "string"
},
{
"name": "max_wal_senders",
"value": "10",
"vartype": "integer"
},
{
"name": "max_replication_slots",
"value": "10",
"vartype": "integer"
},
{
"name": "wal_sender_timeout",
"value": "5s",
"vartype": "string"
},
{
"name": "wal_keep_size",
"value": "0",
"vartype": "integer"
},
{
"name": "password_encryption",
"value": "md5",
"vartype": "enum"
},
{
"name": "restart_after_crash",
"value": "off",
"vartype": "bool"
},
{
"name": "synchronous_standby_names",
"value": "walproposer",
"vartype": "string"
},
{
"name": "shared_preload_libraries",
"value": "neon,pg_cron,timescaledb,pg_stat_statements",
"vartype": "string"
},
{
"name": "neon.safekeepers",
"value": "safekeeper1:5454,safekeeper2:5454,safekeeper3:5454",
"vartype": "string"
},
{
"name": "neon.timeline_id",
"value": "TIMELINE_ID",
"vartype": "string"
},
{
"name": "neon.tenant_id",
"value": "TENANT_ID",
"vartype": "string"
},
{
"name": "neon.pageserver_connstring",
"value": "host=pageserver port=6400",
"vartype": "string"
},
{
"name": "max_replication_write_lag",
"value": "500MB",
"vartype": "string"
},
{
"name": "max_replication_flush_lag",
"value": "10GB",
"vartype": "string"
},
{
"name": "cron.database",
"value": "postgres",
"vartype": "string"
}
]
},
"delta_operations": [
]
},
"compute_ctl_config": {
"jwks": {
"keys": []
}
}
}

View File

@@ -1,141 +0,0 @@
{
"format_version": 1.0,
"timestamp": "2022-10-12T18:00:00.000Z",
"operation_uuid": "0f657b36-4b0f-4a2d-9c2e-1dcd615e7d8c",
"cluster": {
"cluster_id": "docker_compose",
"name": "docker_compose_test",
"state": "restarted",
"roles": [
{
"name": "cloud_admin",
"encrypted_password": "b093c0d3b281ba6da1eacc608620abd8",
"options": null
}
],
"databases": [
],
"settings": [
{
"name": "fsync",
"value": "off",
"vartype": "bool"
},
{
"name": "wal_level",
"value": "logical",
"vartype": "enum"
},
{
"name": "wal_log_hints",
"value": "on",
"vartype": "bool"
},
{
"name": "log_connections",
"value": "on",
"vartype": "bool"
},
{
"name": "port",
"value": "55433",
"vartype": "integer"
},
{
"name": "shared_buffers",
"value": "1MB",
"vartype": "string"
},
{
"name": "max_connections",
"value": "100",
"vartype": "integer"
},
{
"name": "listen_addresses",
"value": "0.0.0.0",
"vartype": "string"
},
{
"name": "max_wal_senders",
"value": "10",
"vartype": "integer"
},
{
"name": "max_replication_slots",
"value": "10",
"vartype": "integer"
},
{
"name": "wal_sender_timeout",
"value": "5s",
"vartype": "string"
},
{
"name": "wal_keep_size",
"value": "0",
"vartype": "integer"
},
{
"name": "password_encryption",
"value": "md5",
"vartype": "enum"
},
{
"name": "restart_after_crash",
"value": "off",
"vartype": "bool"
},
{
"name": "synchronous_standby_names",
"value": "walproposer",
"vartype": "string"
},
{
"name": "shared_preload_libraries",
"value": "neon,pg_cron,timescaledb,pg_stat_statements",
"vartype": "string"
},
{
"name": "neon.safekeepers",
"value": "safekeeper1:5454,safekeeper2:5454,safekeeper3:5454",
"vartype": "string"
},
{
"name": "neon.timeline_id",
"value": "TIMELINE_ID",
"vartype": "string"
},
{
"name": "neon.tenant_id",
"value": "TENANT_ID",
"vartype": "string"
},
{
"name": "neon.pageserver_connstring",
"value": "host=pageserver port=6400",
"vartype": "string"
},
{
"name": "max_replication_write_lag",
"value": "500MB",
"vartype": "string"
},
{
"name": "max_replication_flush_lag",
"value": "10GB",
"vartype": "string"
},
{
"name": "cron.database",
"value": "postgres",
"vartype": "string"
}
]
},
"delta_operations": [
]
}

View File

@@ -159,7 +159,7 @@ services:
#- RUST_BACKTRACE=1
# Mount the test files directly, for faster editing cycle.
volumes:
- ./compute_wrapper/var/db/postgres/specs/:/var/db/postgres/specs/
- ./compute_wrapper/var/db/postgres/configs/:/var/db/postgres/configs/
- ./compute_wrapper/shell/:/shell/
ports:
- 55433:55433 # pg protocol handler

View File

@@ -14,6 +14,32 @@ pub struct GenericAPIError {
pub error: String,
}
/// All configuration parameters necessary for a compute. When
/// [`ComputeConfig::spec`] is provided, it means that the compute is attached
/// to a tenant. [`ComputeConfig::compute_ctl_config`] will always be provided
/// and contains parameters necessary for operating `compute_ctl` independently
/// of whether a tenant is attached to the compute or not.
///
/// This also happens to be the body of `compute_ctl`'s /configure request.
#[derive(Debug, Deserialize, Serialize)]
pub struct ComputeConfig {
/// The compute spec
pub spec: Option<ComputeSpec>,
/// The compute_ctl configuration
#[allow(dead_code)]
pub compute_ctl_config: ComputeCtlConfig,
}
impl From<ControlPlaneConfigResponse> for ComputeConfig {
fn from(value: ControlPlaneConfigResponse) -> Self {
Self {
spec: value.spec,
compute_ctl_config: value.compute_ctl_config,
}
}
}
#[derive(Debug, Clone, Serialize)]
pub struct ExtensionInstallResponse {
pub extension: PgIdent,
@@ -161,7 +187,7 @@ pub struct TlsConfig {
/// Response of the `/computes/{compute_id}/spec` control-plane API.
#[derive(Deserialize, Debug)]
pub struct ControlPlaneSpecResponse {
pub struct ControlPlaneConfigResponse {
pub spec: Option<ComputeSpec>,
pub status: ControlPlaneComputeStatus,
pub compute_ctl_config: ComputeCtlConfig,

View File

@@ -1,8 +1,8 @@
//! `ComputeSpec` represents the contents of the spec.json file.
//!
//! The spec.json file is used to pass information to 'compute_ctl'. It contains
//! all the information needed to start up the right version of PostgreSQL,
//! and connect it to the storage nodes.
//! The ComputeSpec contains all the information needed to start up
//! the right version of PostgreSQL, and connect it to the storage nodes.
//! It can be passed as part of the `config.json`, or the control plane can
//! provide it by calling the compute_ctl's `/compute_ctl` endpoint, or
//! compute_ctl can fetch it by calling the control plane's API.
use std::collections::HashMap;
use indexmap::IndexMap;

View File

@@ -4302,10 +4302,10 @@ class Endpoint(PgProtocol, LogUtils):
def respec_deep(self, **kwargs: Any) -> None:
"""
Update the spec.json file taking into account nested keys.
Distinct method from respec() to not break existing functionality.
NOTE: This method also updates the spec.json file, not endpoint.json.
We need it because neon_local also writes to spec.json, so intended
Update the endpoint.json file taking into account nested keys.
Distinct method from respec() to do not break existing functionality.
NOTE: This method also updates the config.json file, not endpoint.json.
We need it because neon_local also writes to config.json, so intended
use-case is i) start endpoint with some config, ii) respec_deep(),
iii) call reconfigure() to apply the changes.
"""
@@ -4318,17 +4318,17 @@ class Endpoint(PgProtocol, LogUtils):
curr[k] = v
return curr
config_path = os.path.join(self.endpoint_path(), "spec.json")
config_path = os.path.join(self.endpoint_path(), "config.json")
with open(config_path) as f:
data_dict: dict[str, Any] = json.load(f)
config: dict[str, Any] = json.load(f)
log.debug("Current compute spec: %s", json.dumps(data_dict, indent=4))
log.debug("Current compute config: %s", json.dumps(config, indent=4))
update(data_dict, kwargs)
update(config, kwargs)
with open(config_path, "w") as file:
log.debug("Updating compute spec to: %s", json.dumps(data_dict, indent=4))
json.dump(data_dict, file, indent=4)
log.debug("Updating compute config to: %s", json.dumps(config, indent=4))
json.dump(config, file, indent=4)
def wait_for_migrations(self, wait_for: int = NUM_COMPUTE_MIGRATIONS) -> None:
"""
@@ -4345,7 +4345,7 @@ class Endpoint(PgProtocol, LogUtils):
wait_until(check_migrations_done)
# Mock the extension part of spec passed from control plane for local testing
# endpooint.rs adds content of this file as a part of the spec.json
# endpooint.rs adds content of this file as a part of the config.json
def create_remote_extension_spec(self, spec: dict[str, Any]):
"""Create a remote extension spec file for the endpoint."""
remote_extensions_spec_path = os.path.join(

View File

@@ -90,10 +90,12 @@ def test_compute_catalog(neon_simple_env: NeonEnv):
# and reconfigure the endpoint to create some test databases.
endpoint.respec_deep(
**{
"skip_pg_catalog_updates": False,
"cluster": {
"roles": TEST_ROLE_NAMES,
"databases": TEST_DB_NAMES,
"spec": {
"skip_pg_catalog_updates": False,
"cluster": {
"roles": TEST_ROLE_NAMES,
"databases": TEST_DB_NAMES,
},
},
}
)
@@ -155,10 +157,12 @@ def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv):
# and reconfigure the endpoint to apply the changes.
endpoint.respec_deep(
**{
"skip_pg_catalog_updates": False,
"cluster": {
"roles": TEST_ROLE_NAMES,
"databases": TEST_DB_NAMES,
"spec": {
"skip_pg_catalog_updates": False,
"cluster": {
"roles": TEST_ROLE_NAMES,
"databases": TEST_DB_NAMES,
},
},
}
)
@@ -196,12 +200,14 @@ def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv):
endpoint.respec_deep(
**{
"skip_pg_catalog_updates": False,
"cluster": {
"roles": [],
"databases": [],
"spec": {
"skip_pg_catalog_updates": False,
"cluster": {
"roles": [],
"databases": [],
},
"delta_operations": delta_operations,
},
"delta_operations": delta_operations,
}
)
endpoint.reconfigure()
@@ -250,9 +256,11 @@ def test_dropdb_with_subscription(neon_simple_env: NeonEnv):
# and reconfigure the endpoint to apply the changes.
endpoint.respec_deep(
**{
"skip_pg_catalog_updates": False,
"cluster": {
"databases": TEST_DB_NAMES,
"spec": {
"skip_pg_catalog_updates": False,
"cluster": {
"databases": TEST_DB_NAMES,
},
},
}
)
@@ -306,17 +314,19 @@ def test_dropdb_with_subscription(neon_simple_env: NeonEnv):
# and reconfigure the endpoint to apply the changes.
endpoint.respec_deep(
**{
"skip_pg_catalog_updates": False,
"cluster": {
"databases": TEST_DB_NAMES_NEW,
"spec": {
"skip_pg_catalog_updates": False,
"cluster": {
"databases": TEST_DB_NAMES_NEW,
},
"delta_operations": [
{"action": "delete_db", "name": SUB_DB_NAME},
# also test the case when we try to delete a non-existent database
# shouldn't happen in normal operation,
# but can occur when failed operations are retried
{"action": "delete_db", "name": "nonexistent_db"},
],
},
"delta_operations": [
{"action": "delete_db", "name": SUB_DB_NAME},
# also test the case when we try to delete a non-existent database
# shouldn't happen in normal operation,
# but can occur when failed operations are retried
{"action": "delete_db", "name": "nonexistent_db"},
],
}
)
@@ -354,25 +364,27 @@ def test_drop_role_with_table_privileges_from_neon_superuser(neon_simple_env: Ne
endpoint.respec_deep(
**{
"skip_pg_catalog_updates": False,
"cluster": {
"roles": [
{
# We need to create role via compute_ctl, because in this case it will receive
# additional grants equivalent to our real environment, so we can repro some
# issues.
"name": "neon",
# Some autocomplete-suggested hash, no specific meaning.
"encrypted_password": "SCRAM-SHA-256$4096:hBT22QjqpydQWqEulorfXA==$miBogcoj68JWYdsNB5PW1X6PjSLBEcNuctuhtGkb4PY=:hxk2gxkwxGo6P7GCtfpMlhA9zwHvPMsCz+NQf2HfvWk=",
"options": [],
},
],
"databases": [
{
"name": TEST_DB_NAME,
"owner": "neon",
},
],
"spec": {
"skip_pg_catalog_updates": False,
"cluster": {
"roles": [
{
# We need to create role via compute_ctl, because in this case it will receive
# additional grants equivalent to our real environment, so we can repro some
# issues.
"name": "neon",
# Some autocomplete-suggested hash, no specific meaning.
"encrypted_password": "SCRAM-SHA-256$4096:hBT22QjqpydQWqEulorfXA==$miBogcoj68JWYdsNB5PW1X6PjSLBEcNuctuhtGkb4PY=:hxk2gxkwxGo6P7GCtfpMlhA9zwHvPMsCz+NQf2HfvWk=",
"options": [],
},
],
"databases": [
{
"name": TEST_DB_NAME,
"owner": "neon",
},
],
},
},
}
)
@@ -415,13 +427,15 @@ def test_drop_role_with_table_privileges_from_neon_superuser(neon_simple_env: Ne
# Drop role via compute_ctl
endpoint.respec_deep(
**{
"skip_pg_catalog_updates": False,
"delta_operations": [
{
"action": "delete_role",
"name": TEST_GRANTEE,
},
],
"spec": {
"skip_pg_catalog_updates": False,
"delta_operations": [
{
"action": "delete_role",
"name": TEST_GRANTEE,
},
],
},
}
)
endpoint.reconfigure()
@@ -444,13 +458,15 @@ def test_drop_role_with_table_privileges_from_neon_superuser(neon_simple_env: Ne
endpoint.respec_deep(
**{
"skip_pg_catalog_updates": False,
"delta_operations": [
{
"action": "delete_role",
"name": "readonly2",
},
],
"spec": {
"skip_pg_catalog_updates": False,
"delta_operations": [
{
"action": "delete_role",
"name": "readonly2",
},
],
},
}
)
endpoint.reconfigure()
@@ -475,25 +491,27 @@ def test_drop_role_with_table_privileges_from_non_neon_superuser(neon_simple_env
endpoint = env.endpoints.create_start("main")
endpoint.respec_deep(
**{
"skip_pg_catalog_updates": False,
"cluster": {
"roles": [
{
# We need to create role via compute_ctl, because in this case it will receive
# additional grants equivalent to our real environment, so we can repro some
# issues.
"name": TEST_GRANTOR,
# Some autocomplete-suggested hash, no specific meaning.
"encrypted_password": "SCRAM-SHA-256$4096:hBT22QjqpydQWqEulorfXA==$miBogcoj68JWYdsNB5PW1X6PjSLBEcNuctuhtGkb4PY=:hxk2gxkwxGo6P7GCtfpMlhA9zwHvPMsCz+NQf2HfvWk=",
"options": [],
},
],
"databases": [
{
"name": TEST_DB_NAME,
"owner": TEST_GRANTOR,
},
],
"spec": {
"skip_pg_catalog_updates": False,
"cluster": {
"roles": [
{
# We need to create role via compute_ctl, because in this case it will receive
# additional grants equivalent to our real environment, so we can repro some
# issues.
"name": TEST_GRANTOR,
# Some autocomplete-suggested hash, no specific meaning.
"encrypted_password": "SCRAM-SHA-256$4096:hBT22QjqpydQWqEulorfXA==$miBogcoj68JWYdsNB5PW1X6PjSLBEcNuctuhtGkb4PY=:hxk2gxkwxGo6P7GCtfpMlhA9zwHvPMsCz+NQf2HfvWk=",
"options": [],
},
],
"databases": [
{
"name": TEST_DB_NAME,
"owner": TEST_GRANTOR,
},
],
},
},
}
)
@@ -507,13 +525,15 @@ def test_drop_role_with_table_privileges_from_non_neon_superuser(neon_simple_env
endpoint.respec_deep(
**{
"skip_pg_catalog_updates": False,
"delta_operations": [
{
"action": "delete_role",
"name": TEST_GRANTEE,
},
],
"spec": {
"skip_pg_catalog_updates": False,
"delta_operations": [
{
"action": "delete_role",
"name": TEST_GRANTEE,
},
],
},
}
)
endpoint.reconfigure()

View File

@@ -31,15 +31,17 @@ def test_compute_reconfigure(neon_simple_env: NeonEnv):
endpoint.respec_deep(
**{
"skip_pg_catalog_updates": True,
"cluster": {
"settings": [
{
"name": "log_line_prefix",
"vartype": "string",
"value": TEST_LOG_LINE_PREFIX,
}
]
"spec": {
"skip_pg_catalog_updates": True,
"cluster": {
"settings": [
{
"name": "log_line_prefix",
"vartype": "string",
"value": TEST_LOG_LINE_PREFIX,
}
]
},
},
}
)

View File

@@ -251,7 +251,7 @@ def test_multiple_subscription_branching(neon_simple_env: NeonEnv):
NUMBER_OF_DBS = 5
# Create and start endpoint so that neon_local put all the generated
# stuff into the spec.json file.
# stuff into the config.json file.
endpoint = env.endpoints.create_start(
"main",
config_lines=[
@@ -280,13 +280,15 @@ def test_multiple_subscription_branching(neon_simple_env: NeonEnv):
}
)
# Update the spec.json file to create the databases
# Update the config.json file to create the databases
# and reconfigure the endpoint to apply the changes.
endpoint.respec_deep(
**{
"skip_pg_catalog_updates": False,
"cluster": {
"databases": TEST_DB_NAMES,
"spec": {
"skip_pg_catalog_updates": False,
"cluster": {
"databases": TEST_DB_NAMES,
},
},
}
)