mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-04 12:02:55 +00:00
Authorize compute_ctl requests from the control plane (#10530)
The compute should only act if requests come from the control plane. Signed-off-by: Tristan Partin <tristan@neon.tech> Signed-off-by: Tristan Partin <tristan@neon.tech>
This commit is contained in:
68
Cargo.lock
generated
68
Cargo.lock
generated
@@ -783,6 +783,28 @@ dependencies = [
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "axum-extra"
|
||||
version = "0.10.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "460fc6f625a1f7705c6cf62d0d070794e94668988b1c38111baeec177c715f7b"
|
||||
dependencies = [
|
||||
"axum",
|
||||
"axum-core",
|
||||
"bytes",
|
||||
"futures-util",
|
||||
"headers",
|
||||
"http 1.1.0",
|
||||
"http-body 1.0.0",
|
||||
"http-body-util",
|
||||
"mime",
|
||||
"pin-project-lite",
|
||||
"serde",
|
||||
"tower 0.5.2",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "azure_core"
|
||||
version = "0.21.0"
|
||||
@@ -925,9 +947,9 @@ checksum = "0ea22880d78093b0cbe17c89f64a7d457941e65759157ec6cb31a31d652b05e5"
|
||||
|
||||
[[package]]
|
||||
name = "base64"
|
||||
version = "0.21.1"
|
||||
version = "0.21.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3f1e31e207a6b8fb791a38ea3105e6cb541f55e4d029902d3039a4ad07cc4105"
|
||||
checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567"
|
||||
|
||||
[[package]]
|
||||
name = "base64"
|
||||
@@ -1305,6 +1327,7 @@ dependencies = [
|
||||
"aws-sdk-s3",
|
||||
"aws-smithy-types",
|
||||
"axum",
|
||||
"axum-extra",
|
||||
"base64 0.13.1",
|
||||
"bytes",
|
||||
"camino",
|
||||
@@ -1316,6 +1339,7 @@ dependencies = [
|
||||
"flate2",
|
||||
"futures",
|
||||
"http 1.1.0",
|
||||
"jsonwebtoken",
|
||||
"metrics",
|
||||
"nix 0.27.1",
|
||||
"notify",
|
||||
@@ -2297,7 +2321,7 @@ name = "framed-websockets"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/neondatabase/framed-websockets#34eff3d6f8cfccbc5f35e4f65314ff7328621127"
|
||||
dependencies = [
|
||||
"base64 0.21.1",
|
||||
"base64 0.21.7",
|
||||
"bytemuck",
|
||||
"bytes",
|
||||
"futures-core",
|
||||
@@ -2653,7 +2677,7 @@ version = "7.5.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d"
|
||||
dependencies = [
|
||||
"base64 0.21.1",
|
||||
"base64 0.21.7",
|
||||
"byteorder",
|
||||
"crossbeam-channel",
|
||||
"flate2",
|
||||
@@ -2661,6 +2685,30 @@ dependencies = [
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "headers"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "322106e6bd0cba2d5ead589ddb8150a13d7c4217cf80d7c4f682ca994ccc6aa9"
|
||||
dependencies = [
|
||||
"base64 0.21.7",
|
||||
"bytes",
|
||||
"headers-core",
|
||||
"http 1.1.0",
|
||||
"httpdate",
|
||||
"mime",
|
||||
"sha1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "headers-core"
|
||||
version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4"
|
||||
dependencies = [
|
||||
"http 1.1.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "heck"
|
||||
version = "0.5.0"
|
||||
@@ -3385,7 +3433,7 @@ version = "9.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5c7ea04a7c5c055c175f189b6dc6ba036fd62306b58c66c9f6389036c503a3f4"
|
||||
dependencies = [
|
||||
"base64 0.21.1",
|
||||
"base64 0.21.7",
|
||||
"js-sys",
|
||||
"pem",
|
||||
"ring",
|
||||
@@ -4467,7 +4515,7 @@ version = "3.0.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1b8fcc794035347fb64beda2d3b462595dd2753e3f268d89c5aae77e8cf2c310"
|
||||
dependencies = [
|
||||
"base64 0.21.1",
|
||||
"base64 0.21.7",
|
||||
"serde",
|
||||
]
|
||||
|
||||
@@ -5814,7 +5862,7 @@ version = "1.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d194b56d58803a43635bdc398cd17e383d6f71f9182b9a192c127ca42494a59b"
|
||||
dependencies = [
|
||||
"base64 0.21.1",
|
||||
"base64 0.21.7",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -5823,7 +5871,7 @@ version = "2.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f48172685e6ff52a556baa527774f61fcaa884f59daf3375c62a3f1cd2549dab"
|
||||
dependencies = [
|
||||
"base64 0.21.1",
|
||||
"base64 0.21.7",
|
||||
"rustls-pki-types",
|
||||
]
|
||||
|
||||
@@ -7357,10 +7405,12 @@ version = "0.6.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "403fa3b783d4b626a8ad51d766ab03cb6d2dbfc46b1c5d4448395e6628dc9697"
|
||||
dependencies = [
|
||||
"base64 0.22.1",
|
||||
"bitflags 2.8.0",
|
||||
"bytes",
|
||||
"http 1.1.0",
|
||||
"http-body 1.0.0",
|
||||
"mime",
|
||||
"pin-project-lite",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
@@ -8267,7 +8317,7 @@ dependencies = [
|
||||
"ahash",
|
||||
"anyhow",
|
||||
"base64 0.13.1",
|
||||
"base64 0.21.1",
|
||||
"base64 0.21.7",
|
||||
"base64ct",
|
||||
"bytes",
|
||||
"camino",
|
||||
|
||||
@@ -67,6 +67,7 @@ aws-credential-types = "1.2.0"
|
||||
aws-sigv4 = { version = "1.2", features = ["sign-http"] }
|
||||
aws-types = "1.3"
|
||||
axum = { version = "0.8.1", features = ["ws"] }
|
||||
axum-extra = { version = "0.10.0", features = ["typed-header"] }
|
||||
base64 = "0.13.0"
|
||||
bincode = "1.3"
|
||||
bindgen = "0.71"
|
||||
@@ -191,7 +192,7 @@ toml = "0.8"
|
||||
toml_edit = "0.22"
|
||||
tonic = {version = "0.12.3", default-features = false, features = ["channel", "tls", "tls-roots"]}
|
||||
tower = { version = "0.5.2", default-features = false }
|
||||
tower-http = { version = "0.6.2", features = ["request-id", "trace"] }
|
||||
tower-http = { version = "0.6.2", features = ["auth", "request-id", "trace"] }
|
||||
|
||||
# This revision uses opentelemetry 0.27. There's no tag for it.
|
||||
tower-otel = { git = "https://github.com/mattiapenati/tower-otel", rev = "56a7321053bcb72443888257b622ba0d43a11fcd" }
|
||||
|
||||
@@ -17,6 +17,7 @@ aws-sdk-kms.workspace = true
|
||||
aws-smithy-types.workspace = true
|
||||
anyhow.workspace = true
|
||||
axum = { workspace = true, features = [] }
|
||||
axum-extra.workspace = true
|
||||
camino.workspace = true
|
||||
chrono.workspace = true
|
||||
cfg-if.workspace = true
|
||||
@@ -25,6 +26,7 @@ fail.workspace = true
|
||||
flate2.workspace = true
|
||||
futures.workspace = true
|
||||
http.workspace = true
|
||||
jsonwebtoken.workspace = true
|
||||
metrics.workspace = true
|
||||
nix.workspace = true
|
||||
notify.workspace = true
|
||||
|
||||
@@ -179,6 +179,7 @@ fn main() -> Result<()> {
|
||||
live_config_allowed: cli_spec.live_config_allowed,
|
||||
},
|
||||
cli_spec.spec,
|
||||
cli_spec.compute_ctl_config,
|
||||
)?;
|
||||
|
||||
let exit_code = compute_node.run()?;
|
||||
|
||||
@@ -11,7 +11,7 @@ use std::{env, fs};
|
||||
use anyhow::{Context, Result};
|
||||
use chrono::{DateTime, Utc};
|
||||
use compute_api::privilege::Privilege;
|
||||
use compute_api::responses::{ComputeMetrics, ComputeStatus};
|
||||
use compute_api::responses::{ComputeCtlConfig, ComputeMetrics, ComputeStatus};
|
||||
use compute_api::spec::{ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PgIdent};
|
||||
use futures::StreamExt;
|
||||
use futures::future::join_all;
|
||||
@@ -132,6 +132,8 @@ pub struct ComputeState {
|
||||
/// passed by the control plane with a /configure HTTP request.
|
||||
pub pspec: Option<ParsedSpec>,
|
||||
|
||||
pub compute_ctl_config: ComputeCtlConfig,
|
||||
|
||||
/// If the spec is passed by a /configure request, 'startup_span' is the
|
||||
/// /configure request's tracing span. The main thread enters it when it
|
||||
/// processes the compute startup, so that the compute startup is considered
|
||||
@@ -155,6 +157,7 @@ impl ComputeState {
|
||||
last_active: None,
|
||||
error: None,
|
||||
pspec: None,
|
||||
compute_ctl_config: ComputeCtlConfig::default(),
|
||||
startup_span: None,
|
||||
metrics: ComputeMetrics::default(),
|
||||
}
|
||||
@@ -365,7 +368,11 @@ pub(crate) fn construct_superuser_query(spec: &ComputeSpec) -> String {
|
||||
}
|
||||
|
||||
impl ComputeNode {
|
||||
pub fn new(params: ComputeNodeParams, cli_spec: Option<ComputeSpec>) -> Result<Self> {
|
||||
pub fn new(
|
||||
params: ComputeNodeParams,
|
||||
cli_spec: Option<ComputeSpec>,
|
||||
compute_ctl_config: ComputeCtlConfig,
|
||||
) -> Result<Self> {
|
||||
let connstr = params.connstr.as_str();
|
||||
let conn_conf = postgres::config::Config::from_str(connstr)
|
||||
.context("cannot build postgres config from connstr")?;
|
||||
@@ -377,6 +384,7 @@ impl ComputeNode {
|
||||
let pspec = ParsedSpec::try_from(cli_spec).map_err(|msg| anyhow::anyhow!(msg))?;
|
||||
new_state.pspec = Some(pspec);
|
||||
}
|
||||
new_state.compute_ctl_config = compute_ctl_config;
|
||||
|
||||
Ok(ComputeNode {
|
||||
params,
|
||||
@@ -405,11 +413,19 @@ impl ComputeNode {
|
||||
|
||||
// Launch the external HTTP server first, so that we can serve control plane
|
||||
// requests while configuration is still in progress.
|
||||
crate::http::server::Server::External(this.params.external_http_port).launch(&this);
|
||||
crate::http::server::Server::External {
|
||||
port: this.params.external_http_port,
|
||||
jwks: this.state.lock().unwrap().compute_ctl_config.jwks.clone(),
|
||||
compute_id: this.params.compute_id.clone(),
|
||||
}
|
||||
.launch(&this);
|
||||
|
||||
// The internal HTTP server could be launched later, but there isn't much
|
||||
// sense in waiting.
|
||||
crate::http::server::Server::Internal(this.params.internal_http_port).launch(&this);
|
||||
crate::http::server::Server::Internal {
|
||||
port: this.params.internal_http_port,
|
||||
}
|
||||
.launch(&this);
|
||||
|
||||
// If we got a spec from the CLI already, use that. Otherwise wait for the
|
||||
// control plane to pass it to us with a /configure HTTP request
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
pub(crate) mod json;
|
||||
pub(crate) mod path;
|
||||
pub(crate) mod query;
|
||||
pub(crate) mod request_id;
|
||||
|
||||
pub(crate) use json::Json;
|
||||
pub(crate) use path::Path;
|
||||
pub(crate) use query::Query;
|
||||
pub(crate) use request_id::RequestId;
|
||||
|
||||
86
compute_tools/src/http/extract/request_id.rs
Normal file
86
compute_tools/src/http/extract/request_id.rs
Normal file
@@ -0,0 +1,86 @@
|
||||
use std::{
|
||||
fmt::Display,
|
||||
ops::{Deref, DerefMut},
|
||||
};
|
||||
|
||||
use axum::{extract::FromRequestParts, response::IntoResponse};
|
||||
use http::{StatusCode, request::Parts};
|
||||
|
||||
use crate::http::{JsonResponse, headers::X_REQUEST_ID};
|
||||
|
||||
/// Extract the request ID from the `X-Request-Id` header.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub(crate) struct RequestId(pub String);
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Rejection used for [`RequestId`].
|
||||
///
|
||||
/// Contains one variant for each way the [`RequestId`] extractor can
|
||||
/// fail.
|
||||
pub(crate) enum RequestIdRejection {
|
||||
/// The request is missing the header.
|
||||
MissingRequestId,
|
||||
|
||||
/// The value of the header is invalid UTF-8.
|
||||
InvalidUtf8,
|
||||
}
|
||||
|
||||
impl RequestIdRejection {
|
||||
pub fn status(&self) -> StatusCode {
|
||||
match self {
|
||||
RequestIdRejection::MissingRequestId => StatusCode::INTERNAL_SERVER_ERROR,
|
||||
RequestIdRejection::InvalidUtf8 => StatusCode::BAD_REQUEST,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn message(&self) -> String {
|
||||
match self {
|
||||
RequestIdRejection::MissingRequestId => "request ID is missing",
|
||||
RequestIdRejection::InvalidUtf8 => "request ID is invalid UTF-8",
|
||||
}
|
||||
.to_string()
|
||||
}
|
||||
}
|
||||
|
||||
impl IntoResponse for RequestIdRejection {
|
||||
fn into_response(self) -> axum::response::Response {
|
||||
JsonResponse::error(self.status(), self.message())
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> FromRequestParts<S> for RequestId
|
||||
where
|
||||
S: Send + Sync,
|
||||
{
|
||||
type Rejection = RequestIdRejection;
|
||||
|
||||
async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
|
||||
match parts.headers.get(X_REQUEST_ID) {
|
||||
Some(value) => match value.to_str() {
|
||||
Ok(request_id) => Ok(Self(request_id.to_string())),
|
||||
Err(_) => Err(RequestIdRejection::InvalidUtf8),
|
||||
},
|
||||
None => Err(RequestIdRejection::MissingRequestId),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for RequestId {
|
||||
type Target = String;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for RequestId {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for RequestId {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.write_str(&self.0)
|
||||
}
|
||||
}
|
||||
2
compute_tools/src/http/headers.rs
Normal file
2
compute_tools/src/http/headers.rs
Normal file
@@ -0,0 +1,2 @@
|
||||
/// Constant for `X-Request-Id` header.
|
||||
pub const X_REQUEST_ID: &str = "x-request-id";
|
||||
145
compute_tools/src/http/middleware/authorize.rs
Normal file
145
compute_tools/src/http/middleware/authorize.rs
Normal file
@@ -0,0 +1,145 @@
|
||||
use std::{collections::HashSet, net::SocketAddr};
|
||||
|
||||
use anyhow::{Result, anyhow};
|
||||
use axum::{RequestExt, body::Body, extract::ConnectInfo};
|
||||
use axum_extra::{
|
||||
TypedHeader,
|
||||
headers::{Authorization, authorization::Bearer},
|
||||
};
|
||||
use futures::future::BoxFuture;
|
||||
use http::{Request, Response, StatusCode};
|
||||
use jsonwebtoken::{Algorithm, DecodingKey, TokenData, Validation, jwk::JwkSet};
|
||||
use serde::Deserialize;
|
||||
use tower_http::auth::AsyncAuthorizeRequest;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::http::{JsonResponse, extract::RequestId};
|
||||
|
||||
#[derive(Clone, Debug, Deserialize)]
|
||||
pub(in crate::http) struct Claims {
|
||||
compute_id: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub(in crate::http) struct Authorize {
|
||||
compute_id: String,
|
||||
jwks: JwkSet,
|
||||
validation: Validation,
|
||||
}
|
||||
|
||||
impl Authorize {
|
||||
pub fn new(compute_id: String, jwks: JwkSet) -> Self {
|
||||
let mut validation = Validation::new(Algorithm::EdDSA);
|
||||
// Nothing is currently required
|
||||
validation.required_spec_claims = HashSet::new();
|
||||
validation.validate_exp = true;
|
||||
// Unused by the control plane
|
||||
validation.validate_aud = false;
|
||||
// Unused by the control plane
|
||||
validation.validate_nbf = false;
|
||||
|
||||
Self {
|
||||
compute_id,
|
||||
jwks,
|
||||
validation,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncAuthorizeRequest<Body> for Authorize {
|
||||
type RequestBody = Body;
|
||||
type ResponseBody = Body;
|
||||
type Future = BoxFuture<'static, Result<Request<Body>, Response<Self::ResponseBody>>>;
|
||||
|
||||
fn authorize(&mut self, mut request: Request<Body>) -> Self::Future {
|
||||
let compute_id = self.compute_id.clone();
|
||||
let jwks = self.jwks.clone();
|
||||
let validation = self.validation.clone();
|
||||
|
||||
Box::pin(async move {
|
||||
let request_id = request.extract_parts::<RequestId>().await.unwrap();
|
||||
|
||||
// TODO: Remove this check after a successful rollout
|
||||
if jwks.keys.is_empty() {
|
||||
warn!(%request_id, "Authorization has not been configured");
|
||||
|
||||
return Ok(request);
|
||||
}
|
||||
|
||||
let connect_info = request
|
||||
.extract_parts::<ConnectInfo<SocketAddr>>()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// In the event the request is coming from the loopback interface,
|
||||
// allow all requests
|
||||
if connect_info.ip().is_loopback() {
|
||||
warn!(%request_id, "Bypassed authorization because request is coming from the loopback interface");
|
||||
|
||||
return Ok(request);
|
||||
}
|
||||
|
||||
let TypedHeader(Authorization(bearer)) = request
|
||||
.extract_parts::<TypedHeader<Authorization<Bearer>>>()
|
||||
.await
|
||||
.map_err(|_| {
|
||||
JsonResponse::error(StatusCode::BAD_REQUEST, "invalid authorization token")
|
||||
})?;
|
||||
|
||||
let data = match Self::verify(&jwks, bearer.token(), &validation) {
|
||||
Ok(claims) => claims,
|
||||
Err(e) => return Err(JsonResponse::error(StatusCode::UNAUTHORIZED, e)),
|
||||
};
|
||||
|
||||
if data.claims.compute_id != compute_id {
|
||||
return Err(JsonResponse::error(
|
||||
StatusCode::UNAUTHORIZED,
|
||||
"invalid claims in authorization token",
|
||||
));
|
||||
}
|
||||
|
||||
// Make claims available to any subsequent middleware or request
|
||||
// handlers
|
||||
request.extensions_mut().insert(data.claims);
|
||||
|
||||
Ok(request)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Authorize {
|
||||
/// Verify the token using the JSON Web Key set and return the token data.
|
||||
fn verify(jwks: &JwkSet, token: &str, validation: &Validation) -> Result<TokenData<Claims>> {
|
||||
debug_assert!(!jwks.keys.is_empty());
|
||||
|
||||
for jwk in jwks.keys.iter() {
|
||||
let decoding_key = match DecodingKey::from_jwk(jwk) {
|
||||
Ok(key) => key,
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"Failed to construct decoding key from {}: {}",
|
||||
jwk.common.key_id.as_ref().unwrap(),
|
||||
e
|
||||
);
|
||||
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
match jsonwebtoken::decode::<Claims>(token, &decoding_key, validation) {
|
||||
Ok(data) => return Ok(data),
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"Failed to decode authorization token using {}: {}",
|
||||
jwk.common.key_id.as_ref().unwrap(),
|
||||
e
|
||||
);
|
||||
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Err(anyhow!("Failed to verify authorization token"))
|
||||
}
|
||||
}
|
||||
1
compute_tools/src/http/middleware/mod.rs
Normal file
1
compute_tools/src/http/middleware/mod.rs
Normal file
@@ -0,0 +1 @@
|
||||
pub(in crate::http) mod authorize;
|
||||
@@ -7,6 +7,8 @@ use serde::Serialize;
|
||||
use tracing::error;
|
||||
|
||||
mod extract;
|
||||
mod headers;
|
||||
mod middleware;
|
||||
mod routes;
|
||||
pub mod server;
|
||||
|
||||
|
||||
@@ -10,48 +10,58 @@ use axum::middleware::{self, Next};
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use axum::routing::{get, post};
|
||||
use http::StatusCode;
|
||||
use jsonwebtoken::jwk::JwkSet;
|
||||
use tokio::net::TcpListener;
|
||||
use tower::ServiceBuilder;
|
||||
use tower_http::request_id::PropagateRequestIdLayer;
|
||||
use tower_http::trace::TraceLayer;
|
||||
use tracing::{Span, debug, error, info};
|
||||
use tower_http::{
|
||||
auth::AsyncRequireAuthorizationLayer, request_id::PropagateRequestIdLayer, trace::TraceLayer,
|
||||
};
|
||||
use tracing::{Span, error, info};
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::routes::{
|
||||
check_writability, configure, database_schema, dbs_and_roles, extension_server, extensions,
|
||||
grants, insights, metrics, metrics_json, status, terminate,
|
||||
use super::{
|
||||
headers::X_REQUEST_ID,
|
||||
middleware::authorize::Authorize,
|
||||
routes::{
|
||||
check_writability, configure, database_schema, dbs_and_roles, extension_server, extensions,
|
||||
grants, insights, metrics, metrics_json, status, terminate,
|
||||
},
|
||||
};
|
||||
use crate::compute::ComputeNode;
|
||||
|
||||
const X_REQUEST_ID: &str = "x-request-id";
|
||||
|
||||
/// `compute_ctl` has two servers: internal and external. The internal server
|
||||
/// binds to the loopback interface and handles communication from clients on
|
||||
/// the compute. The external server is what receives communication from the
|
||||
/// control plane, the metrics scraper, etc. We make the distinction because
|
||||
/// certain routes in `compute_ctl` only need to be exposed to local processes
|
||||
/// like Postgres via the neon extension and local_proxy.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum Server {
|
||||
Internal(u16),
|
||||
External(u16),
|
||||
Internal {
|
||||
port: u16,
|
||||
},
|
||||
External {
|
||||
port: u16,
|
||||
jwks: JwkSet,
|
||||
compute_id: String,
|
||||
},
|
||||
}
|
||||
|
||||
impl Display for Server {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Server::Internal(_) => f.write_str("internal"),
|
||||
Server::External(_) => f.write_str("external"),
|
||||
Server::Internal { .. } => f.write_str("internal"),
|
||||
Server::External { .. } => f.write_str("external"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Server> for Router<Arc<ComputeNode>> {
|
||||
fn from(server: Server) -> Self {
|
||||
impl From<&Server> for Router<Arc<ComputeNode>> {
|
||||
fn from(server: &Server) -> Self {
|
||||
let mut router = Router::<Arc<ComputeNode>>::new();
|
||||
|
||||
router = match server {
|
||||
Server::Internal(_) => {
|
||||
Server::Internal { .. } => {
|
||||
router = router
|
||||
.route(
|
||||
"/extension_server/{*filename}",
|
||||
@@ -69,59 +79,71 @@ impl From<Server> for Router<Arc<ComputeNode>> {
|
||||
|
||||
router
|
||||
}
|
||||
Server::External(_) => router
|
||||
.route("/check_writability", post(check_writability::is_writable))
|
||||
.route("/configure", post(configure::configure))
|
||||
.route("/database_schema", get(database_schema::get_schema_dump))
|
||||
.route("/dbs_and_roles", get(dbs_and_roles::get_catalog_objects))
|
||||
.route("/insights", get(insights::get_insights))
|
||||
.route("/metrics", get(metrics::get_metrics))
|
||||
.route("/metrics.json", get(metrics_json::get_metrics))
|
||||
.route("/status", get(status::get_status))
|
||||
.route("/terminate", post(terminate::terminate)),
|
||||
Server::External {
|
||||
jwks, compute_id, ..
|
||||
} => {
|
||||
let unauthenticated_router =
|
||||
Router::<Arc<ComputeNode>>::new().route("/metrics", get(metrics::get_metrics));
|
||||
|
||||
let authenticated_router = Router::<Arc<ComputeNode>>::new()
|
||||
.route("/check_writability", post(check_writability::is_writable))
|
||||
.route("/configure", post(configure::configure))
|
||||
.route("/database_schema", get(database_schema::get_schema_dump))
|
||||
.route("/dbs_and_roles", get(dbs_and_roles::get_catalog_objects))
|
||||
.route("/insights", get(insights::get_insights))
|
||||
.route("/metrics.json", get(metrics_json::get_metrics))
|
||||
.route("/status", get(status::get_status))
|
||||
.route("/terminate", post(terminate::terminate))
|
||||
.layer(AsyncRequireAuthorizationLayer::new(Authorize::new(
|
||||
compute_id.clone(),
|
||||
jwks.clone(),
|
||||
)));
|
||||
|
||||
router
|
||||
.merge(unauthenticated_router)
|
||||
.merge(authenticated_router)
|
||||
}
|
||||
};
|
||||
|
||||
router.fallback(Server::handle_404).method_not_allowed_fallback(Server::handle_405).layer(
|
||||
ServiceBuilder::new()
|
||||
// Add this middleware since we assume the request ID exists
|
||||
.layer(middleware::from_fn(maybe_add_request_id_header))
|
||||
.layer(
|
||||
TraceLayer::new_for_http()
|
||||
.on_request(|request: &http::Request<_>, _span: &Span| {
|
||||
let request_id = request
|
||||
.headers()
|
||||
.get(X_REQUEST_ID)
|
||||
.unwrap()
|
||||
.to_str()
|
||||
.unwrap();
|
||||
|
||||
match request.uri().path() {
|
||||
"/metrics" => {
|
||||
debug!(%request_id, "{} {}", request.method(), request.uri())
|
||||
}
|
||||
_ => info!(%request_id, "{} {}", request.method(), request.uri()),
|
||||
};
|
||||
})
|
||||
.on_response(
|
||||
|response: &http::Response<_>, latency: Duration, _span: &Span| {
|
||||
let request_id = response
|
||||
router
|
||||
.fallback(Server::handle_404)
|
||||
.method_not_allowed_fallback(Server::handle_405)
|
||||
.layer(
|
||||
ServiceBuilder::new()
|
||||
.layer(tower_otel::trace::HttpLayer::server(tracing::Level::INFO))
|
||||
// Add this middleware since we assume the request ID exists
|
||||
.layer(middleware::from_fn(maybe_add_request_id_header))
|
||||
.layer(
|
||||
TraceLayer::new_for_http()
|
||||
.on_request(|request: &http::Request<_>, _span: &Span| {
|
||||
let request_id = request
|
||||
.headers()
|
||||
.get(X_REQUEST_ID)
|
||||
.unwrap()
|
||||
.to_str()
|
||||
.unwrap();
|
||||
|
||||
info!(
|
||||
%request_id,
|
||||
code = response.status().as_u16(),
|
||||
latency = latency.as_millis()
|
||||
)
|
||||
},
|
||||
),
|
||||
)
|
||||
.layer(PropagateRequestIdLayer::x_request_id()),
|
||||
)
|
||||
.layer(tower_otel::trace::HttpLayer::server(tracing::Level::INFO))
|
||||
info!(%request_id, "{} {}", request.method(), request.uri());
|
||||
})
|
||||
.on_response(
|
||||
|response: &http::Response<_>, latency: Duration, _span: &Span| {
|
||||
let request_id = response
|
||||
.headers()
|
||||
.get(X_REQUEST_ID)
|
||||
.unwrap()
|
||||
.to_str()
|
||||
.unwrap();
|
||||
|
||||
info!(
|
||||
%request_id,
|
||||
code = response.status().as_u16(),
|
||||
latency = latency.as_millis()
|
||||
);
|
||||
},
|
||||
),
|
||||
)
|
||||
.layer(PropagateRequestIdLayer::x_request_id()),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -145,15 +167,15 @@ impl Server {
|
||||
match self {
|
||||
// TODO: Change this to Ipv6Addr::LOCALHOST when the GitHub runners
|
||||
// allow binding to localhost
|
||||
Server::Internal(_) => IpAddr::from(Ipv6Addr::UNSPECIFIED),
|
||||
Server::External(_) => IpAddr::from(Ipv6Addr::UNSPECIFIED),
|
||||
Server::Internal { .. } => IpAddr::from(Ipv6Addr::UNSPECIFIED),
|
||||
Server::External { .. } => IpAddr::from(Ipv6Addr::UNSPECIFIED),
|
||||
}
|
||||
}
|
||||
|
||||
fn port(self) -> u16 {
|
||||
fn port(&self) -> u16 {
|
||||
match self {
|
||||
Server::Internal(port) => port,
|
||||
Server::External(port) => port,
|
||||
Server::Internal { port, .. } => *port,
|
||||
Server::External { port, .. } => *port,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -180,7 +202,9 @@ impl Server {
|
||||
);
|
||||
}
|
||||
|
||||
let router = Router::from(self).with_state(compute);
|
||||
let router = Router::from(&self)
|
||||
.with_state(compute)
|
||||
.into_make_service_with_connect_info::<SocketAddr>();
|
||||
|
||||
if let Err(e) = axum::serve(listener, router).await {
|
||||
error!("compute_ctl {} HTTP server error: {}", self, e);
|
||||
|
||||
@@ -134,8 +134,10 @@ pub struct CatalogObjects {
|
||||
pub databases: Vec<Database>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
pub struct ComputeCtlConfig {
|
||||
/// Set of JSON web keys that the compute can use to authenticate
|
||||
/// communication from the control plane.
|
||||
pub jwks: JwkSet,
|
||||
}
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ license.workspace = true
|
||||
ahash = { version = "0.8" }
|
||||
anyhow = { version = "1", features = ["backtrace"] }
|
||||
base64-594e8ee84c453af0 = { package = "base64", version = "0.13", features = ["alloc"] }
|
||||
base64-647d43efb71741da = { package = "base64", version = "0.21", features = ["alloc"] }
|
||||
base64-647d43efb71741da = { package = "base64", version = "0.21" }
|
||||
base64ct = { version = "1", default-features = false, features = ["std"] }
|
||||
bytes = { version = "1", features = ["serde"] }
|
||||
camino = { version = "1", default-features = false, features = ["serde1"] }
|
||||
|
||||
Reference in New Issue
Block a user