Implement a second HTTP server within compute_ctl (#10574)

The compute_ctl HTTP server has the following purposes:

- Allow management via the control plane
- Provide an endpoint for scaping metrics
- Provide APIs for compute internal clients
  - Neon Postgres extension for installing remote extensions
  - local_proxy for installing extensions and adding grants

The first two purposes require the HTTP server to be available outside
the compute.

The Neon threat model is a bad actor within our internal network. We
need to reduce the surface area of attack. By exposing unnecessary
unauthenticated HTTP endpoints to the internal network, we increase the
surface area of attack. For endpoints described in the third bullet
point, we can just run an extra HTTP server, which is only bound to the
loopback interface since all consumers of those endpoints are within the
compute.
This commit is contained in:
Tristan Partin
2025-02-11 12:02:22 -06:00
committed by GitHub
parent f7b2293317
commit da9c101939
16 changed files with 310 additions and 167 deletions

View File

@@ -6,16 +6,16 @@ index da723b8..5328114 100644
---- ----
-- No.A-1-1-3 -- No.A-1-1-3
CREATE EXTENSION pg_hint_plan; CREATE EXTENSION pg_hint_plan;
+LOG: Sending request to compute_ctl: http://localhost:3080/extension_server/pg_hint_plan +LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
-- No.A-1-2-3 -- No.A-1-2-3
DROP EXTENSION pg_hint_plan; DROP EXTENSION pg_hint_plan;
-- No.A-1-1-4 -- No.A-1-1-4
CREATE SCHEMA other_schema; CREATE SCHEMA other_schema;
CREATE EXTENSION pg_hint_plan SCHEMA other_schema; CREATE EXTENSION pg_hint_plan SCHEMA other_schema;
+LOG: Sending request to compute_ctl: http://localhost:3080/extension_server/pg_hint_plan +LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
ERROR: extension "pg_hint_plan" must be installed in schema "hint_plan" ERROR: extension "pg_hint_plan" must be installed in schema "hint_plan"
CREATE EXTENSION pg_hint_plan; CREATE EXTENSION pg_hint_plan;
+LOG: Sending request to compute_ctl: http://localhost:3080/extension_server/pg_hint_plan +LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
DROP SCHEMA other_schema; DROP SCHEMA other_schema;
---- ----
---- No. A-5-1 comment pattern ---- No. A-5-1 comment pattern
@@ -35,7 +35,7 @@ index d372459..6282afe 100644
SET client_min_messages TO LOG; SET client_min_messages TO LOG;
SET pg_hint_plan.enable_hint TO on; SET pg_hint_plan.enable_hint TO on;
CREATE EXTENSION file_fdw; CREATE EXTENSION file_fdw;
+LOG: Sending request to compute_ctl: http://localhost:3080/extension_server/file_fdw +LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/file_fdw
CREATE SERVER file_server FOREIGN DATA WRAPPER file_fdw; CREATE SERVER file_server FOREIGN DATA WRAPPER file_fdw;
CREATE USER MAPPING FOR PUBLIC SERVER file_server; CREATE USER MAPPING FOR PUBLIC SERVER file_server;
CREATE FOREIGN TABLE ft1 (id int, val int) SERVER file_server OPTIONS (format 'csv', filename :'filename'); CREATE FOREIGN TABLE ft1 (id int, val int) SERVER file_server OPTIONS (format 'csv', filename :'filename');

View File

@@ -6,16 +6,16 @@ index e7d68a1..65a056c 100644
---- ----
-- No.A-1-1-3 -- No.A-1-1-3
CREATE EXTENSION pg_hint_plan; CREATE EXTENSION pg_hint_plan;
+LOG: Sending request to compute_ctl: http://localhost:3080/extension_server/pg_hint_plan +LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
-- No.A-1-2-3 -- No.A-1-2-3
DROP EXTENSION pg_hint_plan; DROP EXTENSION pg_hint_plan;
-- No.A-1-1-4 -- No.A-1-1-4
CREATE SCHEMA other_schema; CREATE SCHEMA other_schema;
CREATE EXTENSION pg_hint_plan SCHEMA other_schema; CREATE EXTENSION pg_hint_plan SCHEMA other_schema;
+LOG: Sending request to compute_ctl: http://localhost:3080/extension_server/pg_hint_plan +LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
ERROR: extension "pg_hint_plan" must be installed in schema "hint_plan" ERROR: extension "pg_hint_plan" must be installed in schema "hint_plan"
CREATE EXTENSION pg_hint_plan; CREATE EXTENSION pg_hint_plan;
+LOG: Sending request to compute_ctl: http://localhost:3080/extension_server/pg_hint_plan +LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
DROP SCHEMA other_schema; DROP SCHEMA other_schema;
---- ----
---- No. A-5-1 comment pattern ---- No. A-5-1 comment pattern
@@ -168,7 +168,7 @@ index 017fa4b..98d989b 100644
SET client_min_messages TO LOG; SET client_min_messages TO LOG;
SET pg_hint_plan.enable_hint TO on; SET pg_hint_plan.enable_hint TO on;
CREATE EXTENSION file_fdw; CREATE EXTENSION file_fdw;
+LOG: Sending request to compute_ctl: http://localhost:3080/extension_server/file_fdw +LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/file_fdw
CREATE SERVER file_server FOREIGN DATA WRAPPER file_fdw; CREATE SERVER file_server FOREIGN DATA WRAPPER file_fdw;
CREATE USER MAPPING FOR PUBLIC SERVER file_server; CREATE USER MAPPING FOR PUBLIC SERVER file_server;
CREATE FOREIGN TABLE ft1 (id int, val int) SERVER file_server OPTIONS (format 'csv', filename :'filename'); CREATE FOREIGN TABLE ft1 (id int, val int) SERVER file_server OPTIONS (format 'csv', filename :'filename');

View File

@@ -48,6 +48,7 @@ use anyhow::{Context, Result};
use chrono::Utc; use chrono::Utc;
use clap::Parser; use clap::Parser;
use compute_tools::disk_quota::set_disk_quota; use compute_tools::disk_quota::set_disk_quota;
use compute_tools::http::server::Server;
use compute_tools::lsn_lease::launch_lsn_lease_bg_task_for_static; use compute_tools::lsn_lease::launch_lsn_lease_bg_task_for_static;
use signal_hook::consts::{SIGQUIT, SIGTERM}; use signal_hook::consts::{SIGQUIT, SIGTERM};
use signal_hook::{consts::SIGINT, iterator::Signals}; use signal_hook::{consts::SIGINT, iterator::Signals};
@@ -62,7 +63,6 @@ use compute_tools::compute::{
}; };
use compute_tools::configurator::launch_configurator; use compute_tools::configurator::launch_configurator;
use compute_tools::extension_server::get_pg_version_string; use compute_tools::extension_server::get_pg_version_string;
use compute_tools::http::launch_http_server;
use compute_tools::logger::*; use compute_tools::logger::*;
use compute_tools::monitor::launch_monitor; use compute_tools::monitor::launch_monitor;
use compute_tools::params::*; use compute_tools::params::*;
@@ -108,8 +108,20 @@ struct Cli {
#[arg(short = 'r', long, value_parser = parse_remote_ext_config)] #[arg(short = 'r', long, value_parser = parse_remote_ext_config)]
pub remote_ext_config: Option<String>, pub remote_ext_config: Option<String>,
#[arg(long, default_value_t = 3080)] /// The port to bind the external listening HTTP server to. Clients running
pub http_port: u16, /// outside the compute will talk to the compute through this port. Keep
/// the previous name for this argument around for a smoother release
/// with the control plane.
///
/// TODO: Remove the alias after the control plane release which teaches the
/// control plane about the renamed argument.
#[arg(long, alias = "http-port", default_value_t = 3080)]
pub external_http_port: u16,
/// The port to bind the internal listening HTTP server to. Clients like
/// the neon extension (for installing remote extensions) and local_proxy.
#[arg(long)]
pub internal_http_port: Option<u16>,
#[arg(short = 'D', long, value_name = "DATADIR")] #[arg(short = 'D', long, value_name = "DATADIR")]
pub pgdata: String, pub pgdata: String,
@@ -340,7 +352,8 @@ fn wait_spec(
pgdata: cli.pgdata.clone(), pgdata: cli.pgdata.clone(),
pgbin: cli.pgbin.clone(), pgbin: cli.pgbin.clone(),
pgversion: get_pg_version_string(&cli.pgbin), pgversion: get_pg_version_string(&cli.pgbin),
http_port: cli.http_port, external_http_port: cli.external_http_port,
internal_http_port: cli.internal_http_port.unwrap_or(cli.external_http_port + 1),
live_config_allowed, live_config_allowed,
state: Mutex::new(new_state), state: Mutex::new(new_state),
state_changed: Condvar::new(), state_changed: Condvar::new(),
@@ -358,9 +371,13 @@ fn wait_spec(
compute.prewarm_postgres()?; compute.prewarm_postgres()?;
} }
// Launch http service first, so that we can serve control-plane requests // Launch the external HTTP server first, so that we can serve control plane
// while configuration is still in progress. // requests while configuration is still in progress.
let _http_handle = launch_http_server(cli.http_port, &compute); Server::External(cli.external_http_port).launch(&compute);
// The internal HTTP server could be launched later, but there isn't much
// sense in waiting.
Server::Internal(cli.internal_http_port.unwrap_or(cli.external_http_port + 1)).launch(&compute);
if !spec_set { if !spec_set {
// No spec provided, hang waiting for it. // No spec provided, hang waiting for it.

View File

@@ -82,8 +82,10 @@ pub struct ComputeNode {
/// - we push spec and it does configuration /// - we push spec and it does configuration
/// - but then it is restarted without any spec again /// - but then it is restarted without any spec again
pub live_config_allowed: bool, pub live_config_allowed: bool,
/// The port that the compute's HTTP server listens on /// The port that the compute's external HTTP server listens on
pub http_port: u16, pub external_http_port: u16,
/// The port that the compute's internal HTTP server listens on
pub internal_http_port: u16,
/// Volatile part of the `ComputeNode`, which should be used under `Mutex`. /// Volatile part of the `ComputeNode`, which should be used under `Mutex`.
/// To allow HTTP API server to serving status requests, while configuration /// To allow HTTP API server to serving status requests, while configuration
/// is in progress, lock should be held only for short periods of time to do /// is in progress, lock should be held only for short periods of time to do
@@ -631,7 +633,7 @@ impl ComputeNode {
config::write_postgres_conf( config::write_postgres_conf(
&pgdata_path.join("postgresql.conf"), &pgdata_path.join("postgresql.conf"),
&pspec.spec, &pspec.spec,
self.http_port, self.internal_http_port,
)?; )?;
// Syncing safekeepers is only safe with primary nodes: if a primary // Syncing safekeepers is only safe with primary nodes: if a primary
@@ -1396,7 +1398,7 @@ impl ComputeNode {
// Write new config // Write new config
let pgdata_path = Path::new(&self.pgdata); let pgdata_path = Path::new(&self.pgdata);
let postgresql_conf_path = pgdata_path.join("postgresql.conf"); let postgresql_conf_path = pgdata_path.join("postgresql.conf");
config::write_postgres_conf(&postgresql_conf_path, &spec, self.http_port)?; config::write_postgres_conf(&postgresql_conf_path, &spec, self.internal_http_port)?;
let max_concurrent_connections = spec.reconfigure_concurrency; let max_concurrent_connections = spec.reconfigure_concurrency;

View File

@@ -4,11 +4,9 @@ use http::{header::CONTENT_TYPE, StatusCode};
use serde::Serialize; use serde::Serialize;
use tracing::error; use tracing::error;
pub use server::launch_http_server;
mod extract; mod extract;
mod routes; mod routes;
mod server; pub mod server;
/// Convenience response builder for JSON responses /// Convenience response builder for JSON responses
struct JsonResponse; struct JsonResponse;

View File

@@ -1,9 +1,11 @@
use std::{ use std::{
fmt::Display,
net::{IpAddr, Ipv6Addr, SocketAddr}, net::{IpAddr, Ipv6Addr, SocketAddr},
sync::Arc, sync::Arc,
time::Duration, time::Duration,
}; };
use anyhow::Result;
use axum::{ use axum::{
extract::Request, extract::Request,
middleware::{self, Next}, middleware::{self, Next},
@@ -24,45 +26,65 @@ use super::routes::{
}; };
use crate::compute::ComputeNode; use crate::compute::ComputeNode;
async fn handle_404() -> Response {
StatusCode::NOT_FOUND.into_response()
}
const X_REQUEST_ID: &str = "x-request-id"; const X_REQUEST_ID: &str = "x-request-id";
/// This middleware function allows compute_ctl to generate its own request ID /// `compute_ctl` has two servers: internal and external. The internal server
/// if one isn't supplied. The control plane will always send one as a UUID. The /// binds to the loopback interface and handles communication from clients on
/// neon Postgres extension on the other hand does not send one. /// the compute. The external server is what receives communication from the
async fn maybe_add_request_id_header(mut request: Request, next: Next) -> Response { /// control plane, the metrics scraper, etc. We make the distinction because
let headers = request.headers_mut(); /// certain routes in `compute_ctl` only need to be exposed to local processes
/// like Postgres via the neon extension and local_proxy.
if headers.get(X_REQUEST_ID).is_none() { #[derive(Clone, Copy, Debug)]
headers.append(X_REQUEST_ID, Uuid::new_v4().to_string().parse().unwrap()); pub enum Server {
} Internal(u16),
External(u16),
next.run(request).await
} }
/// Run the HTTP server and wait on it forever. impl Display for Server {
async fn serve(port: u16, compute: Arc<ComputeNode>) { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut app = Router::new() match self {
.route("/check_writability", post(check_writability::is_writable)) Server::Internal(_) => f.write_str("internal"),
.route("/configure", post(configure::configure)) Server::External(_) => f.write_str("external"),
.route("/database_schema", get(database_schema::get_schema_dump)) }
.route("/dbs_and_roles", get(dbs_and_roles::get_catalog_objects)) }
.route( }
"/extension_server/{*filename}",
post(extension_server::download_extension), impl From<Server> for Router<Arc<ComputeNode>> {
) fn from(server: Server) -> Self {
.route("/extensions", post(extensions::install_extension)) let mut router = Router::<Arc<ComputeNode>>::new();
.route("/grants", post(grants::add_grant))
.route("/insights", get(insights::get_insights)) router = match server {
.route("/metrics", get(metrics::get_metrics)) Server::Internal(_) => {
.route("/metrics.json", get(metrics_json::get_metrics)) router = router
.route("/status", get(status::get_status)) .route(
.route("/terminate", post(terminate::terminate)) "/extension_server/{*filename}",
.fallback(handle_404) post(extension_server::download_extension),
.layer( )
.route("/extensions", post(extensions::install_extension))
.route("/grants", post(grants::add_grant));
// Add in any testing support
if cfg!(feature = "testing") {
use super::routes::failpoints;
router = router.route("/failpoints", post(failpoints::configure_failpoints));
}
router
}
Server::External(_) => router
.route("/check_writability", post(check_writability::is_writable))
.route("/configure", post(configure::configure))
.route("/database_schema", get(database_schema::get_schema_dump))
.route("/dbs_and_roles", get(dbs_and_roles::get_catalog_objects))
.route("/insights", get(insights::get_insights))
.route("/metrics", get(metrics::get_metrics))
.route("/metrics.json", get(metrics_json::get_metrics))
.route("/status", get(status::get_status))
.route("/terminate", post(terminate::terminate)),
};
router.fallback(Server::handle_404).method_not_allowed_fallback(Server::handle_405).layer(
ServiceBuilder::new() ServiceBuilder::new()
// Add this middleware since we assume the request ID exists // Add this middleware since we assume the request ID exists
.layer(middleware::from_fn(maybe_add_request_id_header)) .layer(middleware::from_fn(maybe_add_request_id_header))
@@ -102,43 +124,88 @@ async fn serve(port: u16, compute: Arc<ComputeNode>) {
) )
.layer(PropagateRequestIdLayer::x_request_id()), .layer(PropagateRequestIdLayer::x_request_id()),
) )
.with_state(compute); }
}
// Add in any testing support impl Server {
if cfg!(feature = "testing") { async fn handle_404() -> impl IntoResponse {
use super::routes::failpoints; StatusCode::NOT_FOUND
app = app.route("/failpoints", post(failpoints::configure_failpoints))
} }
// This usually binds to both IPv4 and IPv6 on Linux, see async fn handle_405() -> impl IntoResponse {
// https://github.com/rust-lang/rust/pull/34440 for more information StatusCode::METHOD_NOT_ALLOWED
let addr = SocketAddr::new(IpAddr::from(Ipv6Addr::UNSPECIFIED), port); }
let listener = match TcpListener::bind(&addr).await {
Ok(listener) => listener, async fn listener(&self) -> Result<TcpListener> {
Err(e) => { let addr = SocketAddr::new(self.ip(), self.port());
error!( let listener = TcpListener::bind(&addr).await?;
"failed to bind the compute_ctl HTTP server to port {}: {}",
port, e Ok(listener)
); }
return;
fn ip(&self) -> IpAddr {
match self {
// TODO: Change this to Ipv6Addr::LOCALHOST when the GitHub runners
// allow binding to localhost
Server::Internal(_) => IpAddr::from(Ipv6Addr::UNSPECIFIED),
Server::External(_) => IpAddr::from(Ipv6Addr::UNSPECIFIED),
} }
};
if let Ok(local_addr) = listener.local_addr() {
info!("compute_ctl HTTP server listening on {}", local_addr);
} else {
info!("compute_ctl HTTP server listening on port {}", port);
} }
if let Err(e) = axum::serve(listener, app).await { fn port(self) -> u16 {
error!("compute_ctl HTTP server error: {}", e); match self {
Server::Internal(port) => port,
Server::External(port) => port,
}
}
async fn serve(self, compute: Arc<ComputeNode>) {
let listener = self.listener().await.unwrap_or_else(|e| {
// If we can't bind, the compute cannot operate correctly
panic!(
"failed to bind the compute_ctl {} HTTP server to {}: {}",
self,
SocketAddr::new(self.ip(), self.port()),
e
);
});
if tracing::enabled!(tracing::Level::INFO) {
let local_addr = match listener.local_addr() {
Ok(local_addr) => local_addr,
Err(_) => SocketAddr::new(self.ip(), self.port()),
};
info!(
"compute_ctl {} HTTP server listening at {}",
self, local_addr
);
}
let router = Router::from(self).with_state(compute);
if let Err(e) = axum::serve(listener, router).await {
error!("compute_ctl {} HTTP server error: {}", self, e);
}
}
pub fn launch(self, compute: &Arc<ComputeNode>) {
let state = Arc::clone(compute);
info!("Launching the {} server", self);
tokio::spawn(self.serve(state));
} }
} }
/// Launch HTTP server in a new task and return its `JoinHandle`. /// This middleware function allows compute_ctl to generate its own request ID
pub fn launch_http_server(port: u16, state: &Arc<ComputeNode>) -> tokio::task::JoinHandle<()> { /// if one isn't supplied. The control plane will always send one as a UUID. The
let state = Arc::clone(state); /// neon Postgres extension on the other hand does not send one.
async fn maybe_add_request_id_header(mut request: Request, next: Next) -> Response {
let headers = request.headers_mut();
if headers.get(X_REQUEST_ID).is_none() {
headers.append(X_REQUEST_ID, Uuid::new_v4().to_string().parse().unwrap());
}
tokio::spawn(serve(port, state)) next.run(request).await
} }

View File

@@ -552,8 +552,10 @@ struct EndpointCreateCmdArgs {
lsn: Option<Lsn>, lsn: Option<Lsn>,
#[clap(long)] #[clap(long)]
pg_port: Option<u16>, pg_port: Option<u16>,
#[clap(long, alias = "http-port")]
external_http_port: Option<u16>,
#[clap(long)] #[clap(long)]
http_port: Option<u16>, internal_http_port: Option<u16>,
#[clap(long = "pageserver-id")] #[clap(long = "pageserver-id")]
endpoint_pageserver_id: Option<NodeId>, endpoint_pageserver_id: Option<NodeId>,
@@ -1353,7 +1355,8 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
tenant_id, tenant_id,
timeline_id, timeline_id,
args.pg_port, args.pg_port,
args.http_port, args.external_http_port,
args.internal_http_port,
args.pg_version, args.pg_version,
mode, mode,
!args.update_catalog, !args.update_catalog,

View File

@@ -37,6 +37,8 @@
//! ``` //! ```
//! //!
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::net::IpAddr;
use std::net::Ipv4Addr;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::net::TcpStream; use std::net::TcpStream;
use std::path::PathBuf; use std::path::PathBuf;
@@ -73,7 +75,8 @@ pub struct EndpointConf {
timeline_id: TimelineId, timeline_id: TimelineId,
mode: ComputeMode, mode: ComputeMode,
pg_port: u16, pg_port: u16,
http_port: u16, external_http_port: u16,
internal_http_port: u16,
pg_version: u32, pg_version: u32,
skip_pg_catalog_updates: bool, skip_pg_catalog_updates: bool,
drop_subscriptions_before_start: bool, drop_subscriptions_before_start: bool,
@@ -128,7 +131,7 @@ impl ComputeControlPlane {
1 + self 1 + self
.endpoints .endpoints
.values() .values()
.map(|ep| std::cmp::max(ep.pg_address.port(), ep.http_address.port())) .map(|ep| std::cmp::max(ep.pg_address.port(), ep.external_http_address.port()))
.max() .max()
.unwrap_or(self.base_port) .unwrap_or(self.base_port)
} }
@@ -140,18 +143,27 @@ impl ComputeControlPlane {
tenant_id: TenantId, tenant_id: TenantId,
timeline_id: TimelineId, timeline_id: TimelineId,
pg_port: Option<u16>, pg_port: Option<u16>,
http_port: Option<u16>, external_http_port: Option<u16>,
internal_http_port: Option<u16>,
pg_version: u32, pg_version: u32,
mode: ComputeMode, mode: ComputeMode,
skip_pg_catalog_updates: bool, skip_pg_catalog_updates: bool,
drop_subscriptions_before_start: bool, drop_subscriptions_before_start: bool,
) -> Result<Arc<Endpoint>> { ) -> Result<Arc<Endpoint>> {
let pg_port = pg_port.unwrap_or_else(|| self.get_port()); let pg_port = pg_port.unwrap_or_else(|| self.get_port());
let http_port = http_port.unwrap_or_else(|| self.get_port() + 1); let external_http_port = external_http_port.unwrap_or_else(|| self.get_port() + 1);
let internal_http_port = internal_http_port.unwrap_or_else(|| external_http_port + 1);
let ep = Arc::new(Endpoint { let ep = Arc::new(Endpoint {
endpoint_id: endpoint_id.to_owned(), endpoint_id: endpoint_id.to_owned(),
pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), pg_port), pg_address: SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), pg_port),
http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), http_port), external_http_address: SocketAddr::new(
IpAddr::from(Ipv4Addr::UNSPECIFIED),
external_http_port,
),
internal_http_address: SocketAddr::new(
IpAddr::from(Ipv4Addr::LOCALHOST),
internal_http_port,
),
env: self.env.clone(), env: self.env.clone(),
timeline_id, timeline_id,
mode, mode,
@@ -176,7 +188,8 @@ impl ComputeControlPlane {
tenant_id, tenant_id,
timeline_id, timeline_id,
mode, mode,
http_port, external_http_port,
internal_http_port,
pg_port, pg_port,
pg_version, pg_version,
skip_pg_catalog_updates, skip_pg_catalog_updates,
@@ -230,9 +243,10 @@ pub struct Endpoint {
pub timeline_id: TimelineId, pub timeline_id: TimelineId,
pub mode: ComputeMode, pub mode: ComputeMode,
// port and address of the Postgres server and `compute_ctl`'s HTTP API // port and address of the Postgres server and `compute_ctl`'s HTTP APIs
pub pg_address: SocketAddr, pub pg_address: SocketAddr,
pub http_address: SocketAddr, pub external_http_address: SocketAddr,
pub internal_http_address: SocketAddr,
// postgres major version in the format: 14, 15, etc. // postgres major version in the format: 14, 15, etc.
pg_version: u32, pg_version: u32,
@@ -287,8 +301,15 @@ impl Endpoint {
serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?; serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?;
Ok(Endpoint { Ok(Endpoint {
pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.pg_port), pg_address: SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), conf.pg_port),
http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.http_port), external_http_address: SocketAddr::new(
IpAddr::from(Ipv4Addr::UNSPECIFIED),
conf.external_http_port,
),
internal_http_address: SocketAddr::new(
IpAddr::from(Ipv4Addr::LOCALHOST),
conf.internal_http_port,
),
endpoint_id, endpoint_id,
env: env.clone(), env: env.clone(),
timeline_id: conf.timeline_id, timeline_id: conf.timeline_id,
@@ -650,40 +671,51 @@ impl Endpoint {
println!("Also at '{}'", conn_str); println!("Also at '{}'", conn_str);
} }
let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl")); let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl"));
cmd.args(["--http-port", &self.http_address.port().to_string()]) //cmd.args([
.args(["--pgdata", self.pgdata().to_str().unwrap()]) // "--external-http-port",
.args(["--connstr", &conn_str]) // &self.external_http_address.port().to_string(),
.args([ //])
"--spec-path", //.args([
self.endpoint_path().join("spec.json").to_str().unwrap(), // "--internal-http-port",
]) // &self.internal_http_address.port().to_string(),
.args([ //])
"--pgbin", cmd.args([
self.env "--http-port",
.pg_bin_dir(self.pg_version)? &self.external_http_address.port().to_string(),
.join("postgres") ])
.to_str() .args(["--pgdata", self.pgdata().to_str().unwrap()])
.unwrap(), .args(["--connstr", &conn_str])
]) .args([
// TODO: It would be nice if we generated compute IDs with the same "--spec-path",
// algorithm as the real control plane. self.endpoint_path().join("spec.json").to_str().unwrap(),
// ])
// TODO: Add this back when .args([
// https://github.com/neondatabase/neon/pull/10747 is merged. "--pgbin",
// self.env
//.args([ .pg_bin_dir(self.pg_version)?
// "--compute-id", .join("postgres")
// &format!( .to_str()
// "compute-{}", .unwrap(),
// SystemTime::now() ])
// .duration_since(UNIX_EPOCH) // TODO: It would be nice if we generated compute IDs with the same
// .unwrap() // algorithm as the real control plane.
// .as_secs() //
// ), // TODO: Add this back when
//]) // https://github.com/neondatabase/neon/pull/10747 is merged.
.stdin(std::process::Stdio::null()) //
.stderr(logfile.try_clone()?) //.args([
.stdout(logfile); // "--compute-id",
// &format!(
// "compute-{}",
// SystemTime::now()
// .duration_since(UNIX_EPOCH)
// .unwrap()
// .as_secs()
// ),
//])
.stdin(std::process::Stdio::null())
.stderr(logfile.try_clone()?)
.stdout(logfile);
if let Some(remote_ext_config) = remote_ext_config { if let Some(remote_ext_config) = remote_ext_config {
cmd.args(["--remote-ext-config", remote_ext_config]); cmd.args(["--remote-ext-config", remote_ext_config]);
@@ -770,8 +802,8 @@ impl Endpoint {
reqwest::Method::GET, reqwest::Method::GET,
format!( format!(
"http://{}:{}/status", "http://{}:{}/status",
self.http_address.ip(), self.external_http_address.ip(),
self.http_address.port() self.external_http_address.port()
), ),
) )
.send() .send()
@@ -844,8 +876,8 @@ impl Endpoint {
let response = client let response = client
.post(format!( .post(format!(
"http://{}:{}/configure", "http://{}:{}/configure",
self.http_address.ip(), self.external_http_address.ip(),
self.http_address.port() self.external_http_address.port()
)) ))
.header(CONTENT_TYPE.as_str(), "application/json") .header(CONTENT_TYPE.as_str(), "application/json")
.body(format!( .body(format!(

View File

@@ -85,8 +85,8 @@ struct LocalProxyCliArgs {
/// Address of the postgres server /// Address of the postgres server
#[clap(long, default_value = "127.0.0.1:5432")] #[clap(long, default_value = "127.0.0.1:5432")]
postgres: SocketAddr, postgres: SocketAddr,
/// Address of the compute-ctl api service /// Address of the internal compute-ctl api service
#[clap(long, default_value = "http://127.0.0.1:3080/")] #[clap(long, default_value = "http://127.0.0.1:3081/")]
compute_ctl: ApiUrl, compute_ctl: ApiUrl,
/// Path of the local proxy config file /// Path of the local proxy config file
#[clap(long, default_value = "./local_proxy.json")] #[clap(long, default_value = "./local_proxy.json")]

View File

@@ -9,21 +9,23 @@ from requests.adapters import HTTPAdapter
class EndpointHttpClient(requests.Session): class EndpointHttpClient(requests.Session):
def __init__( def __init__(
self, self,
port: int, external_port: int,
internal_port: int,
): ):
super().__init__() super().__init__()
self.port = port self.external_port: int = external_port
self.internal_port: int = internal_port
self.mount("http://", HTTPAdapter()) self.mount("http://", HTTPAdapter())
def dbs_and_roles(self): def dbs_and_roles(self):
res = self.get(f"http://localhost:{self.port}/dbs_and_roles") res = self.get(f"http://localhost:{self.external_port}/dbs_and_roles")
res.raise_for_status() res.raise_for_status()
return res.json() return res.json()
def database_schema(self, database: str): def database_schema(self, database: str):
res = self.get( res = self.get(
f"http://localhost:{self.port}/database_schema?database={urllib.parse.quote(database, safe='')}" f"http://localhost:{self.external_port}/database_schema?database={urllib.parse.quote(database, safe='')}"
) )
res.raise_for_status() res.raise_for_status()
return res.text return res.text
@@ -34,20 +36,20 @@ class EndpointHttpClient(requests.Session):
"version": version, "version": version,
"database": database, "database": database,
} }
res = self.post(f"http://localhost:{self.port}/extensions", json=body) res = self.post(f"http://localhost:{self.internal_port}/extensions", json=body)
res.raise_for_status() res.raise_for_status()
return res.json() return res.json()
def set_role_grants(self, database: str, role: str, schema: str, privileges: list[str]): def set_role_grants(self, database: str, role: str, schema: str, privileges: list[str]):
res = self.post( res = self.post(
f"http://localhost:{self.port}/grants", f"http://localhost:{self.internal_port}/grants",
json={"database": database, "schema": schema, "role": role, "privileges": privileges}, json={"database": database, "schema": schema, "role": role, "privileges": privileges},
) )
res.raise_for_status() res.raise_for_status()
return res.json() return res.json()
def metrics(self) -> str: def metrics(self) -> str:
res = self.get(f"http://localhost:{self.port}/metrics") res = self.get(f"http://localhost:{self.external_port}/metrics")
res.raise_for_status() res.raise_for_status()
return res.text return res.text
@@ -62,5 +64,5 @@ class EndpointHttpClient(requests.Session):
} }
) )
res = self.post(f"http://localhost:{self.port}/failpoints", json=body) res = self.post(f"http://localhost:{self.internal_port}/failpoints", json=body)
res.raise_for_status() res.raise_for_status()

View File

@@ -478,7 +478,8 @@ class NeonLocalCli(AbstractNeonCli):
self, self,
branch_name: str, branch_name: str,
pg_port: int, pg_port: int,
http_port: int, external_http_port: int,
internal_http_port: int,
tenant_id: TenantId, tenant_id: TenantId,
pg_version: PgVersion, pg_version: PgVersion,
endpoint_id: str | None = None, endpoint_id: str | None = None,
@@ -501,8 +502,10 @@ class NeonLocalCli(AbstractNeonCli):
args.extend(["--lsn", str(lsn)]) args.extend(["--lsn", str(lsn)])
if pg_port is not None: if pg_port is not None:
args.extend(["--pg-port", str(pg_port)]) args.extend(["--pg-port", str(pg_port)])
if http_port is not None: if external_http_port is not None:
args.extend(["--http-port", str(http_port)]) args.extend(["--external-http-port", str(external_http_port)])
if internal_http_port is not None:
args.extend(["--internal-http-port", str(internal_http_port)])
if endpoint_id is not None: if endpoint_id is not None:
args.append(endpoint_id) args.append(endpoint_id)
if hot_standby: if hot_standby:

View File

@@ -3807,7 +3807,8 @@ class Endpoint(PgProtocol, LogUtils):
env: NeonEnv, env: NeonEnv,
tenant_id: TenantId, tenant_id: TenantId,
pg_port: int, pg_port: int,
http_port: int, external_http_port: int,
internal_http_port: int,
check_stop_result: bool = True, check_stop_result: bool = True,
): ):
super().__init__(host="localhost", port=pg_port, user="cloud_admin", dbname="postgres") super().__init__(host="localhost", port=pg_port, user="cloud_admin", dbname="postgres")
@@ -3817,7 +3818,8 @@ class Endpoint(PgProtocol, LogUtils):
self.pgdata_dir: Path | None = None # Path to computenode PGDATA self.pgdata_dir: Path | None = None # Path to computenode PGDATA
self.tenant_id = tenant_id self.tenant_id = tenant_id
self.pg_port = pg_port self.pg_port = pg_port
self.http_port = http_port self.external_http_port = external_http_port
self.internal_http_port = internal_http_port
self.check_stop_result = check_stop_result self.check_stop_result = check_stop_result
# passed to endpoint create and endpoint reconfigure # passed to endpoint create and endpoint reconfigure
self.active_safekeepers: list[int] = list(map(lambda sk: sk.id, env.safekeepers)) self.active_safekeepers: list[int] = list(map(lambda sk: sk.id, env.safekeepers))
@@ -3834,7 +3836,8 @@ class Endpoint(PgProtocol, LogUtils):
self, auth_token: str | None = None, retries: Retry | None = None self, auth_token: str | None = None, retries: Retry | None = None
) -> EndpointHttpClient: ) -> EndpointHttpClient:
return EndpointHttpClient( return EndpointHttpClient(
port=self.http_port, external_port=self.external_http_port,
internal_port=self.internal_http_port,
) )
def create( def create(
@@ -3866,7 +3869,8 @@ class Endpoint(PgProtocol, LogUtils):
lsn=lsn, lsn=lsn,
hot_standby=hot_standby, hot_standby=hot_standby,
pg_port=self.pg_port, pg_port=self.pg_port,
http_port=self.http_port, external_http_port=self.external_http_port,
internal_http_port=self.internal_http_port,
pg_version=self.env.pg_version, pg_version=self.env.pg_version,
pageserver_id=pageserver_id, pageserver_id=pageserver_id,
allow_multiple=allow_multiple, allow_multiple=allow_multiple,
@@ -4258,7 +4262,8 @@ class EndpointFactory:
self.env, self.env,
tenant_id=tenant_id or self.env.initial_tenant, tenant_id=tenant_id or self.env.initial_tenant,
pg_port=self.env.port_distributor.get_port(), pg_port=self.env.port_distributor.get_port(),
http_port=self.env.port_distributor.get_port(), external_http_port=self.env.port_distributor.get_port(),
internal_http_port=self.env.port_distributor.get_port(),
) )
self.num_instances += 1 self.num_instances += 1
self.endpoints.append(ep) self.endpoints.append(ep)
@@ -4288,7 +4293,8 @@ class EndpointFactory:
self.env, self.env,
tenant_id=tenant_id or self.env.initial_tenant, tenant_id=tenant_id or self.env.initial_tenant,
pg_port=self.env.port_distributor.get_port(), pg_port=self.env.port_distributor.get_port(),
http_port=self.env.port_distributor.get_port(), external_http_port=self.env.port_distributor.get_port(),
internal_http_port=self.env.port_distributor.get_port(),
) )
endpoint_id = endpoint_id or self.env.generate_endpoint_id() endpoint_id = endpoint_id or self.env.generate_endpoint_id()

View File

@@ -79,7 +79,9 @@ def test_lazy_startup(slru: str, neon_env_builder: NeonEnvBuilder, zenbenchmark:
assert sum == 1000000 assert sum == 1000000
# Get metrics # Get metrics
metrics = requests.get(f"http://localhost:{endpoint.http_port}/metrics.json").json() metrics = requests.get(
f"http://localhost:{endpoint.external_http_port}/metrics.json"
).json()
durations = { durations = {
"wait_for_spec_ms": f"{slru}_{i}_wait_for_spec", "wait_for_spec_ms": f"{slru}_{i}_wait_for_spec",
"sync_safekeepers_ms": f"{slru}_{i}_sync_safekeepers", "sync_safekeepers_ms": f"{slru}_{i}_sync_safekeepers",

View File

@@ -56,7 +56,9 @@ def test_startup_simple(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenc
endpoint.safe_psql("select 1;") endpoint.safe_psql("select 1;")
# Get metrics # Get metrics
metrics = requests.get(f"http://localhost:{endpoint.http_port}/metrics.json").json() metrics = requests.get(
f"http://localhost:{endpoint.external_http_port}/metrics.json"
).json()
durations = { durations = {
"wait_for_spec_ms": f"{i}_wait_for_spec", "wait_for_spec_ms": f"{i}_wait_for_spec",
"sync_safekeepers_ms": f"{i}_sync_safekeepers", "sync_safekeepers_ms": f"{i}_sync_safekeepers",

View File

@@ -17,11 +17,13 @@ def test_neon_cli_basics(neon_env_builder: NeonEnvBuilder, port_distributor: Por
main_branch_name = "main" main_branch_name = "main"
pg_port = port_distributor.get_port() pg_port = port_distributor.get_port()
http_port = port_distributor.get_port() external_http_port = port_distributor.get_port()
internal_http_port = port_distributor.get_port()
env.neon_cli.endpoint_create( env.neon_cli.endpoint_create(
main_branch_name, main_branch_name,
pg_port, pg_port,
http_port, external_http_port,
internal_http_port,
endpoint_id="ep-basic-main", endpoint_id="ep-basic-main",
tenant_id=env.initial_tenant, tenant_id=env.initial_tenant,
pg_version=env.pg_version, pg_version=env.pg_version,
@@ -35,11 +37,13 @@ def test_neon_cli_basics(neon_env_builder: NeonEnvBuilder, port_distributor: Por
new_branch_name=branch_name, new_branch_name=branch_name,
) )
pg_port = port_distributor.get_port() pg_port = port_distributor.get_port()
http_port = port_distributor.get_port() external_http_port = port_distributor.get_port()
internal_http_port = port_distributor.get_port()
env.neon_cli.endpoint_create( env.neon_cli.endpoint_create(
branch_name, branch_name,
pg_port, pg_port,
http_port, external_http_port,
internal_http_port,
endpoint_id=f"ep-{branch_name}", endpoint_id=f"ep-{branch_name}",
tenant_id=env.initial_tenant, tenant_id=env.initial_tenant,
pg_version=env.pg_version, pg_version=env.pg_version,
@@ -59,23 +63,27 @@ def test_neon_two_primary_endpoints_fail(
branch_name = "main" branch_name = "main"
pg_port = port_distributor.get_port() pg_port = port_distributor.get_port()
http_port = port_distributor.get_port() external_http_port = port_distributor.get_port()
internal_http_port = port_distributor.get_port()
env.neon_cli.endpoint_create( env.neon_cli.endpoint_create(
branch_name, branch_name,
pg_port, pg_port,
http_port, external_http_port,
internal_http_port,
endpoint_id="ep1", endpoint_id="ep1",
tenant_id=env.initial_tenant, tenant_id=env.initial_tenant,
pg_version=env.pg_version, pg_version=env.pg_version,
) )
pg_port = port_distributor.get_port() pg_port = port_distributor.get_port()
http_port = port_distributor.get_port() external_http_port = port_distributor.get_port()
internal_http_port = port_distributor.get_port()
# ep1 is not running so create will succeed # ep1 is not running so create will succeed
env.neon_cli.endpoint_create( env.neon_cli.endpoint_create(
branch_name, branch_name,
pg_port, pg_port,
http_port, external_http_port,
internal_http_port,
endpoint_id="ep2", endpoint_id="ep2",
tenant_id=env.initial_tenant, tenant_id=env.initial_tenant,
pg_version=env.pg_version, pg_version=env.pg_version,

View File

@@ -268,7 +268,8 @@ def endpoint_create_start(
env, env,
tenant_id=env.initial_tenant, tenant_id=env.initial_tenant,
pg_port=env.port_distributor.get_port(), pg_port=env.port_distributor.get_port(),
http_port=env.port_distributor.get_port(), external_http_port=env.port_distributor.get_port(),
internal_http_port=env.port_distributor.get_port(),
# In these tests compute has high probability of terminating on its own # In these tests compute has high probability of terminating on its own
# before our stop() due to lost consensus leadership. # before our stop() due to lost consensus leadership.
check_stop_result=False, check_stop_result=False,