diff --git a/Cargo.lock b/Cargo.lock index 030753bca5..772b1f50c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -783,6 +783,28 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum-extra" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "460fc6f625a1f7705c6cf62d0d070794e94668988b1c38111baeec177c715f7b" +dependencies = [ + "axum", + "axum-core", + "bytes", + "futures-util", + "headers", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "mime", + "pin-project-lite", + "serde", + "tower 0.5.2", + "tower-layer", + "tower-service", +] + [[package]] name = "azure_core" version = "0.21.0" @@ -925,9 +947,9 @@ checksum = "0ea22880d78093b0cbe17c89f64a7d457941e65759157ec6cb31a31d652b05e5" [[package]] name = "base64" -version = "0.21.1" +version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f1e31e207a6b8fb791a38ea3105e6cb541f55e4d029902d3039a4ad07cc4105" +checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" [[package]] name = "base64" @@ -1305,6 +1327,7 @@ dependencies = [ "aws-sdk-s3", "aws-smithy-types", "axum", + "axum-extra", "base64 0.13.1", "bytes", "camino", @@ -1316,6 +1339,7 @@ dependencies = [ "flate2", "futures", "http 1.1.0", + "jsonwebtoken", "metrics", "nix 0.27.1", "notify", @@ -2297,7 +2321,7 @@ name = "framed-websockets" version = "0.1.0" source = "git+https://github.com/neondatabase/framed-websockets#34eff3d6f8cfccbc5f35e4f65314ff7328621127" dependencies = [ - "base64 0.21.1", + "base64 0.21.7", "bytemuck", "bytes", "futures-core", @@ -2653,7 +2677,7 @@ version = "7.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d" dependencies = [ - "base64 0.21.1", + "base64 0.21.7", "byteorder", "crossbeam-channel", "flate2", @@ -2661,6 +2685,30 @@ dependencies = [ "num-traits", ] +[[package]] +name = "headers" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "322106e6bd0cba2d5ead589ddb8150a13d7c4217cf80d7c4f682ca994ccc6aa9" +dependencies = [ + "base64 0.21.7", + "bytes", + "headers-core", + "http 1.1.0", + "httpdate", + "mime", + "sha1", +] + +[[package]] +name = "headers-core" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4" +dependencies = [ + "http 1.1.0", +] + [[package]] name = "heck" version = "0.5.0" @@ -3385,7 +3433,7 @@ version = "9.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c7ea04a7c5c055c175f189b6dc6ba036fd62306b58c66c9f6389036c503a3f4" dependencies = [ - "base64 0.21.1", + "base64 0.21.7", "js-sys", "pem", "ring", @@ -4467,7 +4515,7 @@ version = "3.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b8fcc794035347fb64beda2d3b462595dd2753e3f268d89c5aae77e8cf2c310" dependencies = [ - "base64 0.21.1", + "base64 0.21.7", "serde", ] @@ -5814,7 +5862,7 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d194b56d58803a43635bdc398cd17e383d6f71f9182b9a192c127ca42494a59b" dependencies = [ - "base64 0.21.1", + "base64 0.21.7", ] [[package]] @@ -5823,7 +5871,7 @@ version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f48172685e6ff52a556baa527774f61fcaa884f59daf3375c62a3f1cd2549dab" dependencies = [ - "base64 0.21.1", + "base64 0.21.7", "rustls-pki-types", ] @@ -7357,10 +7405,12 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "403fa3b783d4b626a8ad51d766ab03cb6d2dbfc46b1c5d4448395e6628dc9697" dependencies = [ + "base64 0.22.1", "bitflags 2.8.0", "bytes", "http 1.1.0", "http-body 1.0.0", + "mime", "pin-project-lite", "tower-layer", "tower-service", @@ -8267,7 +8317,7 @@ dependencies = [ "ahash", "anyhow", "base64 0.13.1", - "base64 0.21.1", + "base64 0.21.7", "base64ct", "bytes", "camino", diff --git a/Cargo.toml b/Cargo.toml index 2303723e43..d11fe4f449 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -67,6 +67,7 @@ aws-credential-types = "1.2.0" aws-sigv4 = { version = "1.2", features = ["sign-http"] } aws-types = "1.3" axum = { version = "0.8.1", features = ["ws"] } +axum-extra = { version = "0.10.0", features = ["typed-header"] } base64 = "0.13.0" bincode = "1.3" bindgen = "0.71" @@ -191,7 +192,7 @@ toml = "0.8" toml_edit = "0.22" tonic = {version = "0.12.3", default-features = false, features = ["channel", "tls", "tls-roots"]} tower = { version = "0.5.2", default-features = false } -tower-http = { version = "0.6.2", features = ["request-id", "trace"] } +tower-http = { version = "0.6.2", features = ["auth", "request-id", "trace"] } # This revision uses opentelemetry 0.27. There's no tag for it. tower-otel = { git = "https://github.com/mattiapenati/tower-otel", rev = "56a7321053bcb72443888257b622ba0d43a11fcd" } diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index 8f3bcbeef8..dd2896714d 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -17,6 +17,7 @@ aws-sdk-kms.workspace = true aws-smithy-types.workspace = true anyhow.workspace = true axum = { workspace = true, features = [] } +axum-extra.workspace = true camino.workspace = true chrono.workspace = true cfg-if.workspace = true @@ -25,6 +26,7 @@ fail.workspace = true flate2.workspace = true futures.workspace = true http.workspace = true +jsonwebtoken.workspace = true metrics.workspace = true nix.workspace = true notify.workspace = true diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 08966a6efb..fc7a3e2827 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -179,6 +179,7 @@ fn main() -> Result<()> { live_config_allowed: cli_spec.live_config_allowed, }, cli_spec.spec, + cli_spec.compute_ctl_config, )?; let exit_code = compute_node.run()?; diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index e4d5a6aaba..d0b1bc2534 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -11,7 +11,7 @@ use std::{env, fs}; use anyhow::{Context, Result}; use chrono::{DateTime, Utc}; use compute_api::privilege::Privilege; -use compute_api::responses::{ComputeMetrics, ComputeStatus}; +use compute_api::responses::{ComputeCtlConfig, ComputeMetrics, ComputeStatus}; use compute_api::spec::{ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PgIdent}; use futures::StreamExt; use futures::future::join_all; @@ -132,6 +132,8 @@ pub struct ComputeState { /// passed by the control plane with a /configure HTTP request. pub pspec: Option, + pub compute_ctl_config: ComputeCtlConfig, + /// If the spec is passed by a /configure request, 'startup_span' is the /// /configure request's tracing span. The main thread enters it when it /// processes the compute startup, so that the compute startup is considered @@ -155,6 +157,7 @@ impl ComputeState { last_active: None, error: None, pspec: None, + compute_ctl_config: ComputeCtlConfig::default(), startup_span: None, metrics: ComputeMetrics::default(), } @@ -365,7 +368,11 @@ pub(crate) fn construct_superuser_query(spec: &ComputeSpec) -> String { } impl ComputeNode { - pub fn new(params: ComputeNodeParams, cli_spec: Option) -> Result { + pub fn new( + params: ComputeNodeParams, + cli_spec: Option, + compute_ctl_config: ComputeCtlConfig, + ) -> Result { let connstr = params.connstr.as_str(); let conn_conf = postgres::config::Config::from_str(connstr) .context("cannot build postgres config from connstr")?; @@ -377,6 +384,7 @@ impl ComputeNode { let pspec = ParsedSpec::try_from(cli_spec).map_err(|msg| anyhow::anyhow!(msg))?; new_state.pspec = Some(pspec); } + new_state.compute_ctl_config = compute_ctl_config; Ok(ComputeNode { params, @@ -405,11 +413,19 @@ impl ComputeNode { // Launch the external HTTP server first, so that we can serve control plane // requests while configuration is still in progress. - crate::http::server::Server::External(this.params.external_http_port).launch(&this); + crate::http::server::Server::External { + port: this.params.external_http_port, + jwks: this.state.lock().unwrap().compute_ctl_config.jwks.clone(), + compute_id: this.params.compute_id.clone(), + } + .launch(&this); // The internal HTTP server could be launched later, but there isn't much // sense in waiting. - crate::http::server::Server::Internal(this.params.internal_http_port).launch(&this); + crate::http::server::Server::Internal { + port: this.params.internal_http_port, + } + .launch(&this); // If we got a spec from the CLI already, use that. Otherwise wait for the // control plane to pass it to us with a /configure HTTP request diff --git a/compute_tools/src/http/extract/mod.rs b/compute_tools/src/http/extract/mod.rs index 1b690e444d..589681cfe2 100644 --- a/compute_tools/src/http/extract/mod.rs +++ b/compute_tools/src/http/extract/mod.rs @@ -1,7 +1,9 @@ pub(crate) mod json; pub(crate) mod path; pub(crate) mod query; +pub(crate) mod request_id; pub(crate) use json::Json; pub(crate) use path::Path; pub(crate) use query::Query; +pub(crate) use request_id::RequestId; diff --git a/compute_tools/src/http/extract/request_id.rs b/compute_tools/src/http/extract/request_id.rs new file mode 100644 index 0000000000..d911921a05 --- /dev/null +++ b/compute_tools/src/http/extract/request_id.rs @@ -0,0 +1,86 @@ +use std::{ + fmt::Display, + ops::{Deref, DerefMut}, +}; + +use axum::{extract::FromRequestParts, response::IntoResponse}; +use http::{StatusCode, request::Parts}; + +use crate::http::{JsonResponse, headers::X_REQUEST_ID}; + +/// Extract the request ID from the `X-Request-Id` header. +#[derive(Debug, Clone, Default)] +pub(crate) struct RequestId(pub String); + +#[derive(Debug)] +/// Rejection used for [`RequestId`]. +/// +/// Contains one variant for each way the [`RequestId`] extractor can +/// fail. +pub(crate) enum RequestIdRejection { + /// The request is missing the header. + MissingRequestId, + + /// The value of the header is invalid UTF-8. + InvalidUtf8, +} + +impl RequestIdRejection { + pub fn status(&self) -> StatusCode { + match self { + RequestIdRejection::MissingRequestId => StatusCode::INTERNAL_SERVER_ERROR, + RequestIdRejection::InvalidUtf8 => StatusCode::BAD_REQUEST, + } + } + + pub fn message(&self) -> String { + match self { + RequestIdRejection::MissingRequestId => "request ID is missing", + RequestIdRejection::InvalidUtf8 => "request ID is invalid UTF-8", + } + .to_string() + } +} + +impl IntoResponse for RequestIdRejection { + fn into_response(self) -> axum::response::Response { + JsonResponse::error(self.status(), self.message()) + } +} + +impl FromRequestParts for RequestId +where + S: Send + Sync, +{ + type Rejection = RequestIdRejection; + + async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result { + match parts.headers.get(X_REQUEST_ID) { + Some(value) => match value.to_str() { + Ok(request_id) => Ok(Self(request_id.to_string())), + Err(_) => Err(RequestIdRejection::InvalidUtf8), + }, + None => Err(RequestIdRejection::MissingRequestId), + } + } +} + +impl Deref for RequestId { + type Target = String; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for RequestId { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl Display for RequestId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(&self.0) + } +} diff --git a/compute_tools/src/http/headers.rs b/compute_tools/src/http/headers.rs new file mode 100644 index 0000000000..a11638e203 --- /dev/null +++ b/compute_tools/src/http/headers.rs @@ -0,0 +1,2 @@ +/// Constant for `X-Request-Id` header. +pub const X_REQUEST_ID: &str = "x-request-id"; diff --git a/compute_tools/src/http/middleware/authorize.rs b/compute_tools/src/http/middleware/authorize.rs new file mode 100644 index 0000000000..798dd1179b --- /dev/null +++ b/compute_tools/src/http/middleware/authorize.rs @@ -0,0 +1,145 @@ +use std::{collections::HashSet, net::SocketAddr}; + +use anyhow::{Result, anyhow}; +use axum::{RequestExt, body::Body, extract::ConnectInfo}; +use axum_extra::{ + TypedHeader, + headers::{Authorization, authorization::Bearer}, +}; +use futures::future::BoxFuture; +use http::{Request, Response, StatusCode}; +use jsonwebtoken::{Algorithm, DecodingKey, TokenData, Validation, jwk::JwkSet}; +use serde::Deserialize; +use tower_http::auth::AsyncAuthorizeRequest; +use tracing::warn; + +use crate::http::{JsonResponse, extract::RequestId}; + +#[derive(Clone, Debug, Deserialize)] +pub(in crate::http) struct Claims { + compute_id: String, +} + +#[derive(Clone, Debug)] +pub(in crate::http) struct Authorize { + compute_id: String, + jwks: JwkSet, + validation: Validation, +} + +impl Authorize { + pub fn new(compute_id: String, jwks: JwkSet) -> Self { + let mut validation = Validation::new(Algorithm::EdDSA); + // Nothing is currently required + validation.required_spec_claims = HashSet::new(); + validation.validate_exp = true; + // Unused by the control plane + validation.validate_aud = false; + // Unused by the control plane + validation.validate_nbf = false; + + Self { + compute_id, + jwks, + validation, + } + } +} + +impl AsyncAuthorizeRequest for Authorize { + type RequestBody = Body; + type ResponseBody = Body; + type Future = BoxFuture<'static, Result, Response>>; + + fn authorize(&mut self, mut request: Request) -> Self::Future { + let compute_id = self.compute_id.clone(); + let jwks = self.jwks.clone(); + let validation = self.validation.clone(); + + Box::pin(async move { + let request_id = request.extract_parts::().await.unwrap(); + + // TODO: Remove this check after a successful rollout + if jwks.keys.is_empty() { + warn!(%request_id, "Authorization has not been configured"); + + return Ok(request); + } + + let connect_info = request + .extract_parts::>() + .await + .unwrap(); + + // In the event the request is coming from the loopback interface, + // allow all requests + if connect_info.ip().is_loopback() { + warn!(%request_id, "Bypassed authorization because request is coming from the loopback interface"); + + return Ok(request); + } + + let TypedHeader(Authorization(bearer)) = request + .extract_parts::>>() + .await + .map_err(|_| { + JsonResponse::error(StatusCode::BAD_REQUEST, "invalid authorization token") + })?; + + let data = match Self::verify(&jwks, bearer.token(), &validation) { + Ok(claims) => claims, + Err(e) => return Err(JsonResponse::error(StatusCode::UNAUTHORIZED, e)), + }; + + if data.claims.compute_id != compute_id { + return Err(JsonResponse::error( + StatusCode::UNAUTHORIZED, + "invalid claims in authorization token", + )); + } + + // Make claims available to any subsequent middleware or request + // handlers + request.extensions_mut().insert(data.claims); + + Ok(request) + }) + } +} + +impl Authorize { + /// Verify the token using the JSON Web Key set and return the token data. + fn verify(jwks: &JwkSet, token: &str, validation: &Validation) -> Result> { + debug_assert!(!jwks.keys.is_empty()); + + for jwk in jwks.keys.iter() { + let decoding_key = match DecodingKey::from_jwk(jwk) { + Ok(key) => key, + Err(e) => { + warn!( + "Failed to construct decoding key from {}: {}", + jwk.common.key_id.as_ref().unwrap(), + e + ); + + continue; + } + }; + + match jsonwebtoken::decode::(token, &decoding_key, validation) { + Ok(data) => return Ok(data), + Err(e) => { + warn!( + "Failed to decode authorization token using {}: {}", + jwk.common.key_id.as_ref().unwrap(), + e + ); + + continue; + } + } + } + + Err(anyhow!("Failed to verify authorization token")) + } +} diff --git a/compute_tools/src/http/middleware/mod.rs b/compute_tools/src/http/middleware/mod.rs new file mode 100644 index 0000000000..caeeeedfe5 --- /dev/null +++ b/compute_tools/src/http/middleware/mod.rs @@ -0,0 +1 @@ +pub(in crate::http) mod authorize; diff --git a/compute_tools/src/http/mod.rs b/compute_tools/src/http/mod.rs index d182278174..9ecc1b0093 100644 --- a/compute_tools/src/http/mod.rs +++ b/compute_tools/src/http/mod.rs @@ -7,6 +7,8 @@ use serde::Serialize; use tracing::error; mod extract; +mod headers; +mod middleware; mod routes; pub mod server; diff --git a/compute_tools/src/http/server.rs b/compute_tools/src/http/server.rs index 7283401bb5..126fa86d1c 100644 --- a/compute_tools/src/http/server.rs +++ b/compute_tools/src/http/server.rs @@ -10,48 +10,58 @@ use axum::middleware::{self, Next}; use axum::response::{IntoResponse, Response}; use axum::routing::{get, post}; use http::StatusCode; +use jsonwebtoken::jwk::JwkSet; use tokio::net::TcpListener; use tower::ServiceBuilder; -use tower_http::request_id::PropagateRequestIdLayer; -use tower_http::trace::TraceLayer; -use tracing::{Span, debug, error, info}; +use tower_http::{ + auth::AsyncRequireAuthorizationLayer, request_id::PropagateRequestIdLayer, trace::TraceLayer, +}; +use tracing::{Span, error, info}; use uuid::Uuid; -use super::routes::{ - check_writability, configure, database_schema, dbs_and_roles, extension_server, extensions, - grants, insights, metrics, metrics_json, status, terminate, +use super::{ + headers::X_REQUEST_ID, + middleware::authorize::Authorize, + routes::{ + check_writability, configure, database_schema, dbs_and_roles, extension_server, extensions, + grants, insights, metrics, metrics_json, status, terminate, + }, }; use crate::compute::ComputeNode; -const X_REQUEST_ID: &str = "x-request-id"; - /// `compute_ctl` has two servers: internal and external. The internal server /// binds to the loopback interface and handles communication from clients on /// the compute. The external server is what receives communication from the /// control plane, the metrics scraper, etc. We make the distinction because /// certain routes in `compute_ctl` only need to be exposed to local processes /// like Postgres via the neon extension and local_proxy. -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Debug)] pub enum Server { - Internal(u16), - External(u16), + Internal { + port: u16, + }, + External { + port: u16, + jwks: JwkSet, + compute_id: String, + }, } impl Display for Server { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Server::Internal(_) => f.write_str("internal"), - Server::External(_) => f.write_str("external"), + Server::Internal { .. } => f.write_str("internal"), + Server::External { .. } => f.write_str("external"), } } } -impl From for Router> { - fn from(server: Server) -> Self { +impl From<&Server> for Router> { + fn from(server: &Server) -> Self { let mut router = Router::>::new(); router = match server { - Server::Internal(_) => { + Server::Internal { .. } => { router = router .route( "/extension_server/{*filename}", @@ -69,59 +79,71 @@ impl From for Router> { router } - Server::External(_) => router - .route("/check_writability", post(check_writability::is_writable)) - .route("/configure", post(configure::configure)) - .route("/database_schema", get(database_schema::get_schema_dump)) - .route("/dbs_and_roles", get(dbs_and_roles::get_catalog_objects)) - .route("/insights", get(insights::get_insights)) - .route("/metrics", get(metrics::get_metrics)) - .route("/metrics.json", get(metrics_json::get_metrics)) - .route("/status", get(status::get_status)) - .route("/terminate", post(terminate::terminate)), + Server::External { + jwks, compute_id, .. + } => { + let unauthenticated_router = + Router::>::new().route("/metrics", get(metrics::get_metrics)); + + let authenticated_router = Router::>::new() + .route("/check_writability", post(check_writability::is_writable)) + .route("/configure", post(configure::configure)) + .route("/database_schema", get(database_schema::get_schema_dump)) + .route("/dbs_and_roles", get(dbs_and_roles::get_catalog_objects)) + .route("/insights", get(insights::get_insights)) + .route("/metrics.json", get(metrics_json::get_metrics)) + .route("/status", get(status::get_status)) + .route("/terminate", post(terminate::terminate)) + .layer(AsyncRequireAuthorizationLayer::new(Authorize::new( + compute_id.clone(), + jwks.clone(), + ))); + + router + .merge(unauthenticated_router) + .merge(authenticated_router) + } }; - router.fallback(Server::handle_404).method_not_allowed_fallback(Server::handle_405).layer( - ServiceBuilder::new() - // Add this middleware since we assume the request ID exists - .layer(middleware::from_fn(maybe_add_request_id_header)) - .layer( - TraceLayer::new_for_http() - .on_request(|request: &http::Request<_>, _span: &Span| { - let request_id = request - .headers() - .get(X_REQUEST_ID) - .unwrap() - .to_str() - .unwrap(); - - match request.uri().path() { - "/metrics" => { - debug!(%request_id, "{} {}", request.method(), request.uri()) - } - _ => info!(%request_id, "{} {}", request.method(), request.uri()), - }; - }) - .on_response( - |response: &http::Response<_>, latency: Duration, _span: &Span| { - let request_id = response + router + .fallback(Server::handle_404) + .method_not_allowed_fallback(Server::handle_405) + .layer( + ServiceBuilder::new() + .layer(tower_otel::trace::HttpLayer::server(tracing::Level::INFO)) + // Add this middleware since we assume the request ID exists + .layer(middleware::from_fn(maybe_add_request_id_header)) + .layer( + TraceLayer::new_for_http() + .on_request(|request: &http::Request<_>, _span: &Span| { + let request_id = request .headers() .get(X_REQUEST_ID) .unwrap() .to_str() .unwrap(); - info!( - %request_id, - code = response.status().as_u16(), - latency = latency.as_millis() - ) - }, - ), - ) - .layer(PropagateRequestIdLayer::x_request_id()), - ) - .layer(tower_otel::trace::HttpLayer::server(tracing::Level::INFO)) + info!(%request_id, "{} {}", request.method(), request.uri()); + }) + .on_response( + |response: &http::Response<_>, latency: Duration, _span: &Span| { + let request_id = response + .headers() + .get(X_REQUEST_ID) + .unwrap() + .to_str() + .unwrap(); + + info!( + %request_id, + code = response.status().as_u16(), + latency = latency.as_millis() + ); + }, + ), + ) + .layer(PropagateRequestIdLayer::x_request_id()), + ) } } @@ -145,15 +167,15 @@ impl Server { match self { // TODO: Change this to Ipv6Addr::LOCALHOST when the GitHub runners // allow binding to localhost - Server::Internal(_) => IpAddr::from(Ipv6Addr::UNSPECIFIED), - Server::External(_) => IpAddr::from(Ipv6Addr::UNSPECIFIED), + Server::Internal { .. } => IpAddr::from(Ipv6Addr::UNSPECIFIED), + Server::External { .. } => IpAddr::from(Ipv6Addr::UNSPECIFIED), } } - fn port(self) -> u16 { + fn port(&self) -> u16 { match self { - Server::Internal(port) => port, - Server::External(port) => port, + Server::Internal { port, .. } => *port, + Server::External { port, .. } => *port, } } @@ -180,7 +202,9 @@ impl Server { ); } - let router = Router::from(self).with_state(compute); + let router = Router::from(&self) + .with_state(compute) + .into_make_service_with_connect_info::(); if let Err(e) = axum::serve(listener, router).await { error!("compute_ctl {} HTTP server error: {}", self, e); diff --git a/libs/compute_api/src/responses.rs b/libs/compute_api/src/responses.rs index 35c580bd37..3300fbf7dd 100644 --- a/libs/compute_api/src/responses.rs +++ b/libs/compute_api/src/responses.rs @@ -134,8 +134,10 @@ pub struct CatalogObjects { pub databases: Vec, } -#[derive(Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] pub struct ComputeCtlConfig { + /// Set of JSON web keys that the compute can use to authenticate + /// communication from the control plane. pub jwks: JwkSet, } diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index 1b7c376560..183cc66ab9 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -18,7 +18,7 @@ license.workspace = true ahash = { version = "0.8" } anyhow = { version = "1", features = ["backtrace"] } base64-594e8ee84c453af0 = { package = "base64", version = "0.13", features = ["alloc"] } -base64-647d43efb71741da = { package = "base64", version = "0.21", features = ["alloc"] } +base64-647d43efb71741da = { package = "base64", version = "0.21" } base64ct = { version = "1", default-features = false, features = ["std"] } bytes = { version = "1", features = ["serde"] } camino = { version = "1", default-features = false, features = ["serde1"] }