From da9c101939eded0063e78f32ea53f0a7e6a44aa1 Mon Sep 17 00:00:00 2001 From: Tristan Partin Date: Tue, 11 Feb 2025 12:02:22 -0600 Subject: [PATCH] Implement a second HTTP server within compute_ctl (#10574) The compute_ctl HTTP server has the following purposes: - Allow management via the control plane - Provide an endpoint for scaping metrics - Provide APIs for compute internal clients - Neon Postgres extension for installing remote extensions - local_proxy for installing extensions and adding grants The first two purposes require the HTTP server to be available outside the compute. The Neon threat model is a bad actor within our internal network. We need to reduce the surface area of attack. By exposing unnecessary unauthenticated HTTP endpoints to the internal network, we increase the surface area of attack. For endpoints described in the third bullet point, we can just run an extra HTTP server, which is only bound to the loopback interface since all consumers of those endpoints are within the compute. --- compute/patches/pg_hint_plan_v16.patch | 8 +- compute/patches/pg_hint_plan_v17.patch | 8 +- compute_tools/src/bin/compute_ctl.rs | 31 ++- compute_tools/src/compute.rs | 10 +- compute_tools/src/http/mod.rs | 4 +- compute_tools/src/http/server.rs | 195 ++++++++++++------ control_plane/src/bin/neon_local.rs | 7 +- control_plane/src/endpoint.rs | 130 +++++++----- proxy/src/bin/local_proxy.rs | 4 +- test_runner/fixtures/endpoint/http.py | 18 +- test_runner/fixtures/neon_cli.py | 9 +- test_runner/fixtures/neon_fixtures.py | 18 +- test_runner/performance/test_lazy_startup.py | 4 +- test_runner/performance/test_startup.py | 4 +- test_runner/regress/test_neon_local_cli.py | 24 ++- .../regress/test_wal_acceptor_async.py | 3 +- 16 files changed, 310 insertions(+), 167 deletions(-) diff --git a/compute/patches/pg_hint_plan_v16.patch b/compute/patches/pg_hint_plan_v16.patch index 4039a036df..1fc3ffa609 100644 --- a/compute/patches/pg_hint_plan_v16.patch +++ b/compute/patches/pg_hint_plan_v16.patch @@ -6,16 +6,16 @@ index da723b8..5328114 100644 ---- -- No.A-1-1-3 CREATE EXTENSION pg_hint_plan; -+LOG: Sending request to compute_ctl: http://localhost:3080/extension_server/pg_hint_plan ++LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan -- No.A-1-2-3 DROP EXTENSION pg_hint_plan; -- No.A-1-1-4 CREATE SCHEMA other_schema; CREATE EXTENSION pg_hint_plan SCHEMA other_schema; -+LOG: Sending request to compute_ctl: http://localhost:3080/extension_server/pg_hint_plan ++LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan ERROR: extension "pg_hint_plan" must be installed in schema "hint_plan" CREATE EXTENSION pg_hint_plan; -+LOG: Sending request to compute_ctl: http://localhost:3080/extension_server/pg_hint_plan ++LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan DROP SCHEMA other_schema; ---- ---- No. A-5-1 comment pattern @@ -35,7 +35,7 @@ index d372459..6282afe 100644 SET client_min_messages TO LOG; SET pg_hint_plan.enable_hint TO on; CREATE EXTENSION file_fdw; -+LOG: Sending request to compute_ctl: http://localhost:3080/extension_server/file_fdw ++LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/file_fdw CREATE SERVER file_server FOREIGN DATA WRAPPER file_fdw; CREATE USER MAPPING FOR PUBLIC SERVER file_server; CREATE FOREIGN TABLE ft1 (id int, val int) SERVER file_server OPTIONS (format 'csv', filename :'filename'); diff --git a/compute/patches/pg_hint_plan_v17.patch b/compute/patches/pg_hint_plan_v17.patch index dbf4e470ea..3442a094eb 100644 --- a/compute/patches/pg_hint_plan_v17.patch +++ b/compute/patches/pg_hint_plan_v17.patch @@ -6,16 +6,16 @@ index e7d68a1..65a056c 100644 ---- -- No.A-1-1-3 CREATE EXTENSION pg_hint_plan; -+LOG: Sending request to compute_ctl: http://localhost:3080/extension_server/pg_hint_plan ++LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan -- No.A-1-2-3 DROP EXTENSION pg_hint_plan; -- No.A-1-1-4 CREATE SCHEMA other_schema; CREATE EXTENSION pg_hint_plan SCHEMA other_schema; -+LOG: Sending request to compute_ctl: http://localhost:3080/extension_server/pg_hint_plan ++LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan ERROR: extension "pg_hint_plan" must be installed in schema "hint_plan" CREATE EXTENSION pg_hint_plan; -+LOG: Sending request to compute_ctl: http://localhost:3080/extension_server/pg_hint_plan ++LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan DROP SCHEMA other_schema; ---- ---- No. A-5-1 comment pattern @@ -168,7 +168,7 @@ index 017fa4b..98d989b 100644 SET client_min_messages TO LOG; SET pg_hint_plan.enable_hint TO on; CREATE EXTENSION file_fdw; -+LOG: Sending request to compute_ctl: http://localhost:3080/extension_server/file_fdw ++LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/file_fdw CREATE SERVER file_server FOREIGN DATA WRAPPER file_fdw; CREATE USER MAPPING FOR PUBLIC SERVER file_server; CREATE FOREIGN TABLE ft1 (id int, val int) SERVER file_server OPTIONS (format 'csv', filename :'filename'); diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 275f345897..df47adda6c 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -48,6 +48,7 @@ use anyhow::{Context, Result}; use chrono::Utc; use clap::Parser; use compute_tools::disk_quota::set_disk_quota; +use compute_tools::http::server::Server; use compute_tools::lsn_lease::launch_lsn_lease_bg_task_for_static; use signal_hook::consts::{SIGQUIT, SIGTERM}; use signal_hook::{consts::SIGINT, iterator::Signals}; @@ -62,7 +63,6 @@ use compute_tools::compute::{ }; use compute_tools::configurator::launch_configurator; use compute_tools::extension_server::get_pg_version_string; -use compute_tools::http::launch_http_server; use compute_tools::logger::*; use compute_tools::monitor::launch_monitor; use compute_tools::params::*; @@ -108,8 +108,20 @@ struct Cli { #[arg(short = 'r', long, value_parser = parse_remote_ext_config)] pub remote_ext_config: Option, - #[arg(long, default_value_t = 3080)] - pub http_port: u16, + /// The port to bind the external listening HTTP server to. Clients running + /// outside the compute will talk to the compute through this port. Keep + /// the previous name for this argument around for a smoother release + /// with the control plane. + /// + /// TODO: Remove the alias after the control plane release which teaches the + /// control plane about the renamed argument. + #[arg(long, alias = "http-port", default_value_t = 3080)] + pub external_http_port: u16, + + /// The port to bind the internal listening HTTP server to. Clients like + /// the neon extension (for installing remote extensions) and local_proxy. + #[arg(long)] + pub internal_http_port: Option, #[arg(short = 'D', long, value_name = "DATADIR")] pub pgdata: String, @@ -340,7 +352,8 @@ fn wait_spec( pgdata: cli.pgdata.clone(), pgbin: cli.pgbin.clone(), pgversion: get_pg_version_string(&cli.pgbin), - http_port: cli.http_port, + external_http_port: cli.external_http_port, + internal_http_port: cli.internal_http_port.unwrap_or(cli.external_http_port + 1), live_config_allowed, state: Mutex::new(new_state), state_changed: Condvar::new(), @@ -358,9 +371,13 @@ fn wait_spec( compute.prewarm_postgres()?; } - // Launch http service first, so that we can serve control-plane requests - // while configuration is still in progress. - let _http_handle = launch_http_server(cli.http_port, &compute); + // Launch the external HTTP server first, so that we can serve control plane + // requests while configuration is still in progress. + Server::External(cli.external_http_port).launch(&compute); + + // The internal HTTP server could be launched later, but there isn't much + // sense in waiting. + Server::Internal(cli.internal_http_port.unwrap_or(cli.external_http_port + 1)).launch(&compute); if !spec_set { // No spec provided, hang waiting for it. diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 7fc54bb490..cadc6f84d1 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -82,8 +82,10 @@ pub struct ComputeNode { /// - we push spec and it does configuration /// - but then it is restarted without any spec again pub live_config_allowed: bool, - /// The port that the compute's HTTP server listens on - pub http_port: u16, + /// The port that the compute's external HTTP server listens on + pub external_http_port: u16, + /// The port that the compute's internal HTTP server listens on + pub internal_http_port: u16, /// Volatile part of the `ComputeNode`, which should be used under `Mutex`. /// To allow HTTP API server to serving status requests, while configuration /// is in progress, lock should be held only for short periods of time to do @@ -631,7 +633,7 @@ impl ComputeNode { config::write_postgres_conf( &pgdata_path.join("postgresql.conf"), &pspec.spec, - self.http_port, + self.internal_http_port, )?; // Syncing safekeepers is only safe with primary nodes: if a primary @@ -1396,7 +1398,7 @@ impl ComputeNode { // Write new config let pgdata_path = Path::new(&self.pgdata); let postgresql_conf_path = pgdata_path.join("postgresql.conf"); - config::write_postgres_conf(&postgresql_conf_path, &spec, self.http_port)?; + config::write_postgres_conf(&postgresql_conf_path, &spec, self.internal_http_port)?; let max_concurrent_connections = spec.reconfigure_concurrency; diff --git a/compute_tools/src/http/mod.rs b/compute_tools/src/http/mod.rs index a596bea504..93eb6ef5b7 100644 --- a/compute_tools/src/http/mod.rs +++ b/compute_tools/src/http/mod.rs @@ -4,11 +4,9 @@ use http::{header::CONTENT_TYPE, StatusCode}; use serde::Serialize; use tracing::error; -pub use server::launch_http_server; - mod extract; mod routes; -mod server; +pub mod server; /// Convenience response builder for JSON responses struct JsonResponse; diff --git a/compute_tools/src/http/server.rs b/compute_tools/src/http/server.rs index 19dded5172..a523ecd96f 100644 --- a/compute_tools/src/http/server.rs +++ b/compute_tools/src/http/server.rs @@ -1,9 +1,11 @@ use std::{ + fmt::Display, net::{IpAddr, Ipv6Addr, SocketAddr}, sync::Arc, time::Duration, }; +use anyhow::Result; use axum::{ extract::Request, middleware::{self, Next}, @@ -24,45 +26,65 @@ use super::routes::{ }; use crate::compute::ComputeNode; -async fn handle_404() -> Response { - StatusCode::NOT_FOUND.into_response() -} - const X_REQUEST_ID: &str = "x-request-id"; -/// This middleware function allows compute_ctl to generate its own request ID -/// if one isn't supplied. The control plane will always send one as a UUID. The -/// neon Postgres extension on the other hand does not send one. -async fn maybe_add_request_id_header(mut request: Request, next: Next) -> Response { - let headers = request.headers_mut(); - - if headers.get(X_REQUEST_ID).is_none() { - headers.append(X_REQUEST_ID, Uuid::new_v4().to_string().parse().unwrap()); - } - - next.run(request).await +/// `compute_ctl` has two servers: internal and external. The internal server +/// binds to the loopback interface and handles communication from clients on +/// the compute. The external server is what receives communication from the +/// control plane, the metrics scraper, etc. We make the distinction because +/// certain routes in `compute_ctl` only need to be exposed to local processes +/// like Postgres via the neon extension and local_proxy. +#[derive(Clone, Copy, Debug)] +pub enum Server { + Internal(u16), + External(u16), } -/// Run the HTTP server and wait on it forever. -async fn serve(port: u16, compute: Arc) { - let mut app = Router::new() - .route("/check_writability", post(check_writability::is_writable)) - .route("/configure", post(configure::configure)) - .route("/database_schema", get(database_schema::get_schema_dump)) - .route("/dbs_and_roles", get(dbs_and_roles::get_catalog_objects)) - .route( - "/extension_server/{*filename}", - post(extension_server::download_extension), - ) - .route("/extensions", post(extensions::install_extension)) - .route("/grants", post(grants::add_grant)) - .route("/insights", get(insights::get_insights)) - .route("/metrics", get(metrics::get_metrics)) - .route("/metrics.json", get(metrics_json::get_metrics)) - .route("/status", get(status::get_status)) - .route("/terminate", post(terminate::terminate)) - .fallback(handle_404) - .layer( +impl Display for Server { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Server::Internal(_) => f.write_str("internal"), + Server::External(_) => f.write_str("external"), + } + } +} + +impl From for Router> { + fn from(server: Server) -> Self { + let mut router = Router::>::new(); + + router = match server { + Server::Internal(_) => { + router = router + .route( + "/extension_server/{*filename}", + post(extension_server::download_extension), + ) + .route("/extensions", post(extensions::install_extension)) + .route("/grants", post(grants::add_grant)); + + // Add in any testing support + if cfg!(feature = "testing") { + use super::routes::failpoints; + + router = router.route("/failpoints", post(failpoints::configure_failpoints)); + } + + router + } + Server::External(_) => router + .route("/check_writability", post(check_writability::is_writable)) + .route("/configure", post(configure::configure)) + .route("/database_schema", get(database_schema::get_schema_dump)) + .route("/dbs_and_roles", get(dbs_and_roles::get_catalog_objects)) + .route("/insights", get(insights::get_insights)) + .route("/metrics", get(metrics::get_metrics)) + .route("/metrics.json", get(metrics_json::get_metrics)) + .route("/status", get(status::get_status)) + .route("/terminate", post(terminate::terminate)), + }; + + router.fallback(Server::handle_404).method_not_allowed_fallback(Server::handle_405).layer( ServiceBuilder::new() // Add this middleware since we assume the request ID exists .layer(middleware::from_fn(maybe_add_request_id_header)) @@ -102,43 +124,88 @@ async fn serve(port: u16, compute: Arc) { ) .layer(PropagateRequestIdLayer::x_request_id()), ) - .with_state(compute); + } +} - // Add in any testing support - if cfg!(feature = "testing") { - use super::routes::failpoints; - - app = app.route("/failpoints", post(failpoints::configure_failpoints)) +impl Server { + async fn handle_404() -> impl IntoResponse { + StatusCode::NOT_FOUND } - // This usually binds to both IPv4 and IPv6 on Linux, see - // https://github.com/rust-lang/rust/pull/34440 for more information - let addr = SocketAddr::new(IpAddr::from(Ipv6Addr::UNSPECIFIED), port); - let listener = match TcpListener::bind(&addr).await { - Ok(listener) => listener, - Err(e) => { - error!( - "failed to bind the compute_ctl HTTP server to port {}: {}", - port, e - ); - return; + async fn handle_405() -> impl IntoResponse { + StatusCode::METHOD_NOT_ALLOWED + } + + async fn listener(&self) -> Result { + let addr = SocketAddr::new(self.ip(), self.port()); + let listener = TcpListener::bind(&addr).await?; + + Ok(listener) + } + + fn ip(&self) -> IpAddr { + match self { + // TODO: Change this to Ipv6Addr::LOCALHOST when the GitHub runners + // allow binding to localhost + Server::Internal(_) => IpAddr::from(Ipv6Addr::UNSPECIFIED), + Server::External(_) => IpAddr::from(Ipv6Addr::UNSPECIFIED), } - }; - - if let Ok(local_addr) = listener.local_addr() { - info!("compute_ctl HTTP server listening on {}", local_addr); - } else { - info!("compute_ctl HTTP server listening on port {}", port); } - if let Err(e) = axum::serve(listener, app).await { - error!("compute_ctl HTTP server error: {}", e); + fn port(self) -> u16 { + match self { + Server::Internal(port) => port, + Server::External(port) => port, + } + } + + async fn serve(self, compute: Arc) { + let listener = self.listener().await.unwrap_or_else(|e| { + // If we can't bind, the compute cannot operate correctly + panic!( + "failed to bind the compute_ctl {} HTTP server to {}: {}", + self, + SocketAddr::new(self.ip(), self.port()), + e + ); + }); + + if tracing::enabled!(tracing::Level::INFO) { + let local_addr = match listener.local_addr() { + Ok(local_addr) => local_addr, + Err(_) => SocketAddr::new(self.ip(), self.port()), + }; + + info!( + "compute_ctl {} HTTP server listening at {}", + self, local_addr + ); + } + + let router = Router::from(self).with_state(compute); + + if let Err(e) = axum::serve(listener, router).await { + error!("compute_ctl {} HTTP server error: {}", self, e); + } + } + + pub fn launch(self, compute: &Arc) { + let state = Arc::clone(compute); + + info!("Launching the {} server", self); + + tokio::spawn(self.serve(state)); } } -/// Launch HTTP server in a new task and return its `JoinHandle`. -pub fn launch_http_server(port: u16, state: &Arc) -> tokio::task::JoinHandle<()> { - let state = Arc::clone(state); +/// This middleware function allows compute_ctl to generate its own request ID +/// if one isn't supplied. The control plane will always send one as a UUID. The +/// neon Postgres extension on the other hand does not send one. +async fn maybe_add_request_id_header(mut request: Request, next: Next) -> Response { + let headers = request.headers_mut(); + if headers.get(X_REQUEST_ID).is_none() { + headers.append(X_REQUEST_ID, Uuid::new_v4().to_string().parse().unwrap()); + } - tokio::spawn(serve(port, state)) + next.run(request).await } diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index ba67ffa2dd..02d793400a 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -552,8 +552,10 @@ struct EndpointCreateCmdArgs { lsn: Option, #[clap(long)] pg_port: Option, + #[clap(long, alias = "http-port")] + external_http_port: Option, #[clap(long)] - http_port: Option, + internal_http_port: Option, #[clap(long = "pageserver-id")] endpoint_pageserver_id: Option, @@ -1353,7 +1355,8 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res tenant_id, timeline_id, args.pg_port, - args.http_port, + args.external_http_port, + args.internal_http_port, args.pg_version, mode, !args.update_catalog, diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 6ee6f8f1ec..3b2634204c 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -37,6 +37,8 @@ //! ``` //! use std::collections::BTreeMap; +use std::net::IpAddr; +use std::net::Ipv4Addr; use std::net::SocketAddr; use std::net::TcpStream; use std::path::PathBuf; @@ -73,7 +75,8 @@ pub struct EndpointConf { timeline_id: TimelineId, mode: ComputeMode, pg_port: u16, - http_port: u16, + external_http_port: u16, + internal_http_port: u16, pg_version: u32, skip_pg_catalog_updates: bool, drop_subscriptions_before_start: bool, @@ -128,7 +131,7 @@ impl ComputeControlPlane { 1 + self .endpoints .values() - .map(|ep| std::cmp::max(ep.pg_address.port(), ep.http_address.port())) + .map(|ep| std::cmp::max(ep.pg_address.port(), ep.external_http_address.port())) .max() .unwrap_or(self.base_port) } @@ -140,18 +143,27 @@ impl ComputeControlPlane { tenant_id: TenantId, timeline_id: TimelineId, pg_port: Option, - http_port: Option, + external_http_port: Option, + internal_http_port: Option, pg_version: u32, mode: ComputeMode, skip_pg_catalog_updates: bool, drop_subscriptions_before_start: bool, ) -> Result> { let pg_port = pg_port.unwrap_or_else(|| self.get_port()); - let http_port = http_port.unwrap_or_else(|| self.get_port() + 1); + let external_http_port = external_http_port.unwrap_or_else(|| self.get_port() + 1); + let internal_http_port = internal_http_port.unwrap_or_else(|| external_http_port + 1); let ep = Arc::new(Endpoint { endpoint_id: endpoint_id.to_owned(), - pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), pg_port), - http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), http_port), + pg_address: SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), pg_port), + external_http_address: SocketAddr::new( + IpAddr::from(Ipv4Addr::UNSPECIFIED), + external_http_port, + ), + internal_http_address: SocketAddr::new( + IpAddr::from(Ipv4Addr::LOCALHOST), + internal_http_port, + ), env: self.env.clone(), timeline_id, mode, @@ -176,7 +188,8 @@ impl ComputeControlPlane { tenant_id, timeline_id, mode, - http_port, + external_http_port, + internal_http_port, pg_port, pg_version, skip_pg_catalog_updates, @@ -230,9 +243,10 @@ pub struct Endpoint { pub timeline_id: TimelineId, pub mode: ComputeMode, - // port and address of the Postgres server and `compute_ctl`'s HTTP API + // port and address of the Postgres server and `compute_ctl`'s HTTP APIs pub pg_address: SocketAddr, - pub http_address: SocketAddr, + pub external_http_address: SocketAddr, + pub internal_http_address: SocketAddr, // postgres major version in the format: 14, 15, etc. pg_version: u32, @@ -287,8 +301,15 @@ impl Endpoint { serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?; Ok(Endpoint { - pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.pg_port), - http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.http_port), + pg_address: SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), conf.pg_port), + external_http_address: SocketAddr::new( + IpAddr::from(Ipv4Addr::UNSPECIFIED), + conf.external_http_port, + ), + internal_http_address: SocketAddr::new( + IpAddr::from(Ipv4Addr::LOCALHOST), + conf.internal_http_port, + ), endpoint_id, env: env.clone(), timeline_id: conf.timeline_id, @@ -650,40 +671,51 @@ impl Endpoint { println!("Also at '{}'", conn_str); } let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl")); - cmd.args(["--http-port", &self.http_address.port().to_string()]) - .args(["--pgdata", self.pgdata().to_str().unwrap()]) - .args(["--connstr", &conn_str]) - .args([ - "--spec-path", - self.endpoint_path().join("spec.json").to_str().unwrap(), - ]) - .args([ - "--pgbin", - self.env - .pg_bin_dir(self.pg_version)? - .join("postgres") - .to_str() - .unwrap(), - ]) - // TODO: It would be nice if we generated compute IDs with the same - // algorithm as the real control plane. - // - // TODO: Add this back when - // https://github.com/neondatabase/neon/pull/10747 is merged. - // - //.args([ - // "--compute-id", - // &format!( - // "compute-{}", - // SystemTime::now() - // .duration_since(UNIX_EPOCH) - // .unwrap() - // .as_secs() - // ), - //]) - .stdin(std::process::Stdio::null()) - .stderr(logfile.try_clone()?) - .stdout(logfile); + //cmd.args([ + // "--external-http-port", + // &self.external_http_address.port().to_string(), + //]) + //.args([ + // "--internal-http-port", + // &self.internal_http_address.port().to_string(), + //]) + cmd.args([ + "--http-port", + &self.external_http_address.port().to_string(), + ]) + .args(["--pgdata", self.pgdata().to_str().unwrap()]) + .args(["--connstr", &conn_str]) + .args([ + "--spec-path", + self.endpoint_path().join("spec.json").to_str().unwrap(), + ]) + .args([ + "--pgbin", + self.env + .pg_bin_dir(self.pg_version)? + .join("postgres") + .to_str() + .unwrap(), + ]) + // TODO: It would be nice if we generated compute IDs with the same + // algorithm as the real control plane. + // + // TODO: Add this back when + // https://github.com/neondatabase/neon/pull/10747 is merged. + // + //.args([ + // "--compute-id", + // &format!( + // "compute-{}", + // SystemTime::now() + // .duration_since(UNIX_EPOCH) + // .unwrap() + // .as_secs() + // ), + //]) + .stdin(std::process::Stdio::null()) + .stderr(logfile.try_clone()?) + .stdout(logfile); if let Some(remote_ext_config) = remote_ext_config { cmd.args(["--remote-ext-config", remote_ext_config]); @@ -770,8 +802,8 @@ impl Endpoint { reqwest::Method::GET, format!( "http://{}:{}/status", - self.http_address.ip(), - self.http_address.port() + self.external_http_address.ip(), + self.external_http_address.port() ), ) .send() @@ -844,8 +876,8 @@ impl Endpoint { let response = client .post(format!( "http://{}:{}/configure", - self.http_address.ip(), - self.http_address.port() + self.external_http_address.ip(), + self.external_http_address.port() )) .header(CONTENT_TYPE.as_str(), "application/json") .body(format!( diff --git a/proxy/src/bin/local_proxy.rs b/proxy/src/bin/local_proxy.rs index 7a855bf54b..8d8a4c124a 100644 --- a/proxy/src/bin/local_proxy.rs +++ b/proxy/src/bin/local_proxy.rs @@ -85,8 +85,8 @@ struct LocalProxyCliArgs { /// Address of the postgres server #[clap(long, default_value = "127.0.0.1:5432")] postgres: SocketAddr, - /// Address of the compute-ctl api service - #[clap(long, default_value = "http://127.0.0.1:3080/")] + /// Address of the internal compute-ctl api service + #[clap(long, default_value = "http://127.0.0.1:3081/")] compute_ctl: ApiUrl, /// Path of the local proxy config file #[clap(long, default_value = "./local_proxy.json")] diff --git a/test_runner/fixtures/endpoint/http.py b/test_runner/fixtures/endpoint/http.py index 6e8210e978..cdc162fca2 100644 --- a/test_runner/fixtures/endpoint/http.py +++ b/test_runner/fixtures/endpoint/http.py @@ -9,21 +9,23 @@ from requests.adapters import HTTPAdapter class EndpointHttpClient(requests.Session): def __init__( self, - port: int, + external_port: int, + internal_port: int, ): super().__init__() - self.port = port + self.external_port: int = external_port + self.internal_port: int = internal_port self.mount("http://", HTTPAdapter()) def dbs_and_roles(self): - res = self.get(f"http://localhost:{self.port}/dbs_and_roles") + res = self.get(f"http://localhost:{self.external_port}/dbs_and_roles") res.raise_for_status() return res.json() def database_schema(self, database: str): res = self.get( - f"http://localhost:{self.port}/database_schema?database={urllib.parse.quote(database, safe='')}" + f"http://localhost:{self.external_port}/database_schema?database={urllib.parse.quote(database, safe='')}" ) res.raise_for_status() return res.text @@ -34,20 +36,20 @@ class EndpointHttpClient(requests.Session): "version": version, "database": database, } - res = self.post(f"http://localhost:{self.port}/extensions", json=body) + res = self.post(f"http://localhost:{self.internal_port}/extensions", json=body) res.raise_for_status() return res.json() def set_role_grants(self, database: str, role: str, schema: str, privileges: list[str]): res = self.post( - f"http://localhost:{self.port}/grants", + f"http://localhost:{self.internal_port}/grants", json={"database": database, "schema": schema, "role": role, "privileges": privileges}, ) res.raise_for_status() return res.json() def metrics(self) -> str: - res = self.get(f"http://localhost:{self.port}/metrics") + res = self.get(f"http://localhost:{self.external_port}/metrics") res.raise_for_status() return res.text @@ -62,5 +64,5 @@ class EndpointHttpClient(requests.Session): } ) - res = self.post(f"http://localhost:{self.port}/failpoints", json=body) + res = self.post(f"http://localhost:{self.internal_port}/failpoints", json=body) res.raise_for_status() diff --git a/test_runner/fixtures/neon_cli.py b/test_runner/fixtures/neon_cli.py index 33d422c590..6a016d2621 100644 --- a/test_runner/fixtures/neon_cli.py +++ b/test_runner/fixtures/neon_cli.py @@ -478,7 +478,8 @@ class NeonLocalCli(AbstractNeonCli): self, branch_name: str, pg_port: int, - http_port: int, + external_http_port: int, + internal_http_port: int, tenant_id: TenantId, pg_version: PgVersion, endpoint_id: str | None = None, @@ -501,8 +502,10 @@ class NeonLocalCli(AbstractNeonCli): args.extend(["--lsn", str(lsn)]) if pg_port is not None: args.extend(["--pg-port", str(pg_port)]) - if http_port is not None: - args.extend(["--http-port", str(http_port)]) + if external_http_port is not None: + args.extend(["--external-http-port", str(external_http_port)]) + if internal_http_port is not None: + args.extend(["--internal-http-port", str(internal_http_port)]) if endpoint_id is not None: args.append(endpoint_id) if hot_standby: diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 3d3a445b97..41e9952b8a 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -3807,7 +3807,8 @@ class Endpoint(PgProtocol, LogUtils): env: NeonEnv, tenant_id: TenantId, pg_port: int, - http_port: int, + external_http_port: int, + internal_http_port: int, check_stop_result: bool = True, ): super().__init__(host="localhost", port=pg_port, user="cloud_admin", dbname="postgres") @@ -3817,7 +3818,8 @@ class Endpoint(PgProtocol, LogUtils): self.pgdata_dir: Path | None = None # Path to computenode PGDATA self.tenant_id = tenant_id self.pg_port = pg_port - self.http_port = http_port + self.external_http_port = external_http_port + self.internal_http_port = internal_http_port self.check_stop_result = check_stop_result # passed to endpoint create and endpoint reconfigure self.active_safekeepers: list[int] = list(map(lambda sk: sk.id, env.safekeepers)) @@ -3834,7 +3836,8 @@ class Endpoint(PgProtocol, LogUtils): self, auth_token: str | None = None, retries: Retry | None = None ) -> EndpointHttpClient: return EndpointHttpClient( - port=self.http_port, + external_port=self.external_http_port, + internal_port=self.internal_http_port, ) def create( @@ -3866,7 +3869,8 @@ class Endpoint(PgProtocol, LogUtils): lsn=lsn, hot_standby=hot_standby, pg_port=self.pg_port, - http_port=self.http_port, + external_http_port=self.external_http_port, + internal_http_port=self.internal_http_port, pg_version=self.env.pg_version, pageserver_id=pageserver_id, allow_multiple=allow_multiple, @@ -4258,7 +4262,8 @@ class EndpointFactory: self.env, tenant_id=tenant_id or self.env.initial_tenant, pg_port=self.env.port_distributor.get_port(), - http_port=self.env.port_distributor.get_port(), + external_http_port=self.env.port_distributor.get_port(), + internal_http_port=self.env.port_distributor.get_port(), ) self.num_instances += 1 self.endpoints.append(ep) @@ -4288,7 +4293,8 @@ class EndpointFactory: self.env, tenant_id=tenant_id or self.env.initial_tenant, pg_port=self.env.port_distributor.get_port(), - http_port=self.env.port_distributor.get_port(), + external_http_port=self.env.port_distributor.get_port(), + internal_http_port=self.env.port_distributor.get_port(), ) endpoint_id = endpoint_id or self.env.generate_endpoint_id() diff --git a/test_runner/performance/test_lazy_startup.py b/test_runner/performance/test_lazy_startup.py index 704073fe3b..3bf3ef890f 100644 --- a/test_runner/performance/test_lazy_startup.py +++ b/test_runner/performance/test_lazy_startup.py @@ -79,7 +79,9 @@ def test_lazy_startup(slru: str, neon_env_builder: NeonEnvBuilder, zenbenchmark: assert sum == 1000000 # Get metrics - metrics = requests.get(f"http://localhost:{endpoint.http_port}/metrics.json").json() + metrics = requests.get( + f"http://localhost:{endpoint.external_http_port}/metrics.json" + ).json() durations = { "wait_for_spec_ms": f"{slru}_{i}_wait_for_spec", "sync_safekeepers_ms": f"{slru}_{i}_sync_safekeepers", diff --git a/test_runner/performance/test_startup.py b/test_runner/performance/test_startup.py index d051717e92..60d8b5be30 100644 --- a/test_runner/performance/test_startup.py +++ b/test_runner/performance/test_startup.py @@ -56,7 +56,9 @@ def test_startup_simple(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenc endpoint.safe_psql("select 1;") # Get metrics - metrics = requests.get(f"http://localhost:{endpoint.http_port}/metrics.json").json() + metrics = requests.get( + f"http://localhost:{endpoint.external_http_port}/metrics.json" + ).json() durations = { "wait_for_spec_ms": f"{i}_wait_for_spec", "sync_safekeepers_ms": f"{i}_sync_safekeepers", diff --git a/test_runner/regress/test_neon_local_cli.py b/test_runner/regress/test_neon_local_cli.py index 80e26d9432..8d9aab6848 100644 --- a/test_runner/regress/test_neon_local_cli.py +++ b/test_runner/regress/test_neon_local_cli.py @@ -17,11 +17,13 @@ def test_neon_cli_basics(neon_env_builder: NeonEnvBuilder, port_distributor: Por main_branch_name = "main" pg_port = port_distributor.get_port() - http_port = port_distributor.get_port() + external_http_port = port_distributor.get_port() + internal_http_port = port_distributor.get_port() env.neon_cli.endpoint_create( main_branch_name, pg_port, - http_port, + external_http_port, + internal_http_port, endpoint_id="ep-basic-main", tenant_id=env.initial_tenant, pg_version=env.pg_version, @@ -35,11 +37,13 @@ def test_neon_cli_basics(neon_env_builder: NeonEnvBuilder, port_distributor: Por new_branch_name=branch_name, ) pg_port = port_distributor.get_port() - http_port = port_distributor.get_port() + external_http_port = port_distributor.get_port() + internal_http_port = port_distributor.get_port() env.neon_cli.endpoint_create( branch_name, pg_port, - http_port, + external_http_port, + internal_http_port, endpoint_id=f"ep-{branch_name}", tenant_id=env.initial_tenant, pg_version=env.pg_version, @@ -59,23 +63,27 @@ def test_neon_two_primary_endpoints_fail( branch_name = "main" pg_port = port_distributor.get_port() - http_port = port_distributor.get_port() + external_http_port = port_distributor.get_port() + internal_http_port = port_distributor.get_port() env.neon_cli.endpoint_create( branch_name, pg_port, - http_port, + external_http_port, + internal_http_port, endpoint_id="ep1", tenant_id=env.initial_tenant, pg_version=env.pg_version, ) pg_port = port_distributor.get_port() - http_port = port_distributor.get_port() + external_http_port = port_distributor.get_port() + internal_http_port = port_distributor.get_port() # ep1 is not running so create will succeed env.neon_cli.endpoint_create( branch_name, pg_port, - http_port, + external_http_port, + internal_http_port, endpoint_id="ep2", tenant_id=env.initial_tenant, pg_version=env.pg_version, diff --git a/test_runner/regress/test_wal_acceptor_async.py b/test_runner/regress/test_wal_acceptor_async.py index b32b028fa1..936c774657 100644 --- a/test_runner/regress/test_wal_acceptor_async.py +++ b/test_runner/regress/test_wal_acceptor_async.py @@ -268,7 +268,8 @@ def endpoint_create_start( env, tenant_id=env.initial_tenant, pg_port=env.port_distributor.get_port(), - http_port=env.port_distributor.get_port(), + external_http_port=env.port_distributor.get_port(), + internal_http_port=env.port_distributor.get_port(), # In these tests compute has high probability of terminating on its own # before our stop() due to lost consensus leadership. check_stop_result=False,