place the entire rest_broker code under a feature flag

This commit is contained in:
Ruslan Talpa
2025-07-04 12:46:48 +03:00
parent afa4e48071
commit 54da030a2d
7 changed files with 95 additions and 54 deletions

View File

@@ -5,9 +5,9 @@ edition = "2024"
license.workspace = true
[features]
default = ["subzero"]
default = ["rest_broker"]
testing = ["dep:tokio-postgres"]
subzero = ["subzero-core", "jsonpath_lib", "ouroboros"]
rest_broker = ["subzero-core", "jsonpath_lib", "ouroboros"]
[dependencies]
ahash.workspace = true

View File

@@ -170,8 +170,8 @@ Create a configuration file called `local_proxy.json` in the root of the repo (u
Start the local proxy:
```sh
cargo run --bin local_proxy -- \
--disable_pg_session_jwt true \
cargo run --bin local_proxy --features testing -- \
--disable-pg-session-jwt \
--http 0.0.0.0:7432
```
@@ -198,3 +198,9 @@ curl -k "https://foo.local.neon.build:8080/sql" \
-H "neon-connection-string: postgresql://authenticator@foo.local.neon.build/database" \
-d '{"query":"select 1","params":[]}'
```
Make a rest request against the auth broker (rest broker):
```sh
curl -k "https://foo.local.neon.build:8080/database/rest/v1/items?select=id,name&id=eq.1" \
-H "Authorization: Bearer $NEON_JWT"
```

View File

@@ -22,9 +22,11 @@ use crate::auth::backend::jwt::JwkCache;
use crate::auth::backend::local::LocalBackend;
use crate::auth::{self};
use crate::cancellation::CancellationHandler;
#[cfg(feature = "rest_broker")]
use crate::config::RestConfig;
use crate::config::refresh_config_loop;
use crate::config::{
self, AuthenticationConfig, ComputeConfig, HttpConfig, ProxyConfig, RestConfig, RetryConfig,
self, AuthenticationConfig, ComputeConfig, HttpConfig, ProxyConfig, RetryConfig,
};
use crate::control_plane::locks::ApiLocks;
use crate::http::health_server::AppMetrics;
@@ -278,6 +280,7 @@ fn build_config(args: &LocalProxyCliArgs) -> anyhow::Result<&'static ProxyConfig
accept_jwts: true,
console_redirect_confirmation_timeout: Duration::ZERO,
},
#[cfg(feature = "rest_broker")]
rest_config: RestConfig {
is_rest_broker: false,
db_schema_cache: None,

View File

@@ -27,11 +27,13 @@ use crate::auth::backend::local::LocalBackend;
use crate::auth::backend::{ConsoleRedirectBackend, MaybeOwned};
use crate::batch::BatchQueue;
use crate::cancellation::{CancellationHandler, CancellationProcessor};
#[cfg(feature = "rest_broker")]
use crate::config::RestConfig;
#[cfg(any(test, feature = "testing"))]
use crate::config::refresh_config_loop;
use crate::config::{
self, AuthenticationConfig, CacheOptions, ComputeConfig, HttpConfig, ProjectInfoCacheOptions,
ProxyConfig, ProxyProtocolV2, RestConfig, remote_storage_from_toml,
ProxyConfig, ProxyProtocolV2, remote_storage_from_toml,
};
use crate::context::parquet::ParquetUploadArgs;
use crate::http::health_server::AppMetrics;
@@ -43,6 +45,7 @@ use crate::redis::{elasticache, notifications};
use crate::scram::threadpool::ThreadPool;
use crate::serverless::GlobalConnPoolOptions;
use crate::serverless::cancel_set::CancelSet;
#[cfg(feature = "rest_broker")]
use crate::serverless::rest::DbSchemaCache;
use crate::tls::client_config::compute_client_config_with_root_certs;
#[cfg(any(test, feature = "testing"))]
@@ -245,10 +248,12 @@ struct ProxyCliArgs {
/// if this is not local proxy, this toggles whether we accept Postgres REST requests
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
#[cfg(feature = "rest_broker")]
is_rest_broker: bool,
/// cache for `db_schema_cache` introspection (use `size=0` to disable)
#[clap(long, default_value = "size=1000,ttl=1h")]
#[cfg(feature = "rest_broker")]
db_schema_cache: String,
}
@@ -517,6 +522,7 @@ pub async fn run() -> anyhow::Result<()> {
maintenance_tasks.spawn(control_plane::mgmt::task_main(mgmt_listener));
// add a task to flush the db_schema cache every 10 minutes
#[cfg(feature = "rest_broker")]
if let Some(db_schema_cache) = &config.rest_config.db_schema_cache {
maintenance_tasks.spawn(async move {
loop {
@@ -696,22 +702,26 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
timeout: Duration::from_secs(2),
};
let db_schema_cache_config: CacheOptions = args.db_schema_cache.parse()?;
info!("Using DbSchemaCache with options={db_schema_cache_config:?}");
#[cfg(feature = "rest_broker")]
let rest_config = {
let db_schema_cache_config: CacheOptions = args.db_schema_cache.parse()?;
info!("Using DbSchemaCache with options={db_schema_cache_config:?}");
let db_schema_cache = if args.is_rest_broker {
Some(DbSchemaCache::new(
"db_schema_cache",
db_schema_cache_config.size,
db_schema_cache_config.ttl,
true,
))
} else {
None
};
let rest_config = RestConfig {
is_rest_broker: args.is_rest_broker,
db_schema_cache,
let db_schema_cache = if args.is_rest_broker {
Some(DbSchemaCache::new(
"db_schema_cache",
db_schema_cache_config.size,
db_schema_cache_config.ttl,
true,
))
} else {
None
};
RestConfig {
is_rest_broker: args.is_rest_broker,
db_schema_cache,
}
};
let config = ProxyConfig {
@@ -726,6 +736,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
connect_to_compute: compute_config,
#[cfg(feature = "testing")]
disable_pg_session_jwt: false,
#[cfg(feature = "rest_broker")]
rest_config,
};

View File

@@ -13,6 +13,7 @@ use crate::rate_limiter::{RateBucketInfo, RateLimitAlgorithm, RateLimiterConfig}
use crate::scram::threadpool::ThreadPool;
use crate::serverless::GlobalConnPoolOptions;
use crate::serverless::cancel_set::CancelSet;
#[cfg(feature = "rest_broker")]
use crate::serverless::rest::DbSchemaCache;
pub use crate::tls::server_config::{TlsConfig, configure_tls};
use crate::types::Host;
@@ -33,6 +34,7 @@ pub struct ProxyConfig {
pub metric_collection: Option<MetricCollectionConfig>,
pub http_config: HttpConfig,
pub authentication_config: AuthenticationConfig,
#[cfg(feature = "rest_broker")]
pub rest_config: RestConfig,
pub proxy_protocol_v2: ProxyProtocolV2,
pub handshake_timeout: Duration,
@@ -84,6 +86,7 @@ pub struct AuthenticationConfig {
pub console_redirect_confirmation_timeout: tokio::time::Duration,
}
#[cfg(feature = "rest_broker")]
pub struct RestConfig {
pub is_rest_broker: bool,
pub db_schema_cache: Option<DbSchemaCache>,

View File

@@ -11,6 +11,7 @@ mod http_conn_pool;
mod http_util;
mod json;
mod local_conn_pool;
#[cfg(feature = "rest_broker")]
pub mod rest;
mod sql_over_http;
mod websocket;
@@ -487,25 +488,38 @@ async fn request_handler(
.status(StatusCode::OK) // 204 is also valid, but see: https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods/OPTIONS#status_code
.body(Empty::new().map_err(|x| match x {}).boxed())
.map_err(|e| ApiError::InternalServerError(e.into()))
} else if config.rest_config.is_rest_broker && request.uri().path().starts_with("/rest") {
let ctx = RequestContext::new(session_id, conn_info, crate::metrics::Protocol::Http);
let span = ctx.span();
let testodrome_id = request
.headers()
.get("X-Neon-Query-ID")
.and_then(|value| value.to_str().ok())
.map(|s| s.to_string());
if let Some(query_id) = testodrome_id {
info!(parent: &ctx.span(), "testodrome query ID: {query_id}");
ctx.set_testodrome_id(query_id.into());
}
rest::handle(config, ctx, request, backend, http_cancellation_token)
.instrument(span)
.await
} else {
json_response(StatusCode::BAD_REQUEST, "query is not supported")
#[cfg(feature = "rest_broker")]
{
if config.rest_config.is_rest_broker && {
let path_parts: Vec<&str> = request.uri().path().split('/').collect();
path_parts.len() >= 3 && path_parts[2].starts_with("rest")
} {
let ctx =
RequestContext::new(session_id, conn_info, crate::metrics::Protocol::Http);
let span = ctx.span();
let testodrome_id = request
.headers()
.get("X-Neon-Query-ID")
.and_then(|value| value.to_str().ok())
.map(|s| s.to_string());
if let Some(query_id) = testodrome_id {
info!(parent: &ctx.span(), "testodrome query ID: {query_id}");
ctx.set_testodrome_id(query_id.into());
}
rest::handle(config, ctx, request, backend, http_cancellation_token)
.instrument(span)
.await
} else {
json_response(StatusCode::BAD_REQUEST, "query is not supported")
}
}
#[cfg(not(feature = "rest_broker"))]
{
json_response(StatusCode::BAD_REQUEST, "query is not supported")
}
}
}

View File

@@ -55,7 +55,7 @@ use subzero_core::{
},
parser::postgrest::parse,
permissions::check_safe_functions,
schema::{DbSchema, POSTGRESQL_CONFIGURATION_SQL, POSTGRESQL_INTROSPECTION_SQL},
schema::{DbSchema, POSTGRESQL_INTROSPECTION_SQL, get_postgresql_configuration_query},
};
use tokio_util::sync::CancellationToken;
use tracing::{error, info};
@@ -66,7 +66,6 @@ static MAX_SCHEMA_SIZE: usize = 1024 * 1024 * 5; // 5MB
static MAX_HTTP_BODY_SIZE: usize = 10 * 1024 * 1024; // 10MB limit
static EMPTY_JSON_SCHEMA: &str = r#"{"schemas":[]}"#;
const INTROSPECTION_SQL: &str = POSTGRESQL_INTROSPECTION_SQL;
const CONFIGURATION_SQL: &str = POSTGRESQL_CONFIGURATION_SQL;
// A wrapper around the DbSchema that allows for self-referencing
#[self_referencing]
@@ -174,7 +173,8 @@ impl DbSchemaCache {
(&RAW_TEXT_OUTPUT, HeaderValue::from_str("true").unwrap()),
];
let body = serde_json::json!({"query": CONFIGURATION_SQL});
let query = get_postgresql_configuration_query(Some("pgrst.pre_config"));
let body = serde_json::json!({"query": query});
let (response_status, mut response_json) =
make_local_proxy_request(client, headers, body).await?;
@@ -641,19 +641,22 @@ async fn handle_inner(
"handling interactive connection from client"
);
let endpoint_id = request
.uri()
.host()
.unwrap_or("")
.split('.')
.next()
.unwrap_or("");
let host = request.uri().host().unwrap_or("");
let path_parts: Vec<&str> = request.uri().path().split('/').collect();
// a valid path is /database/rest/v1/... so parts should be ["", "database", "rest", "v1", ...]
let database_name = if path_parts.len() >= 3 && path_parts[1].len() > 0 {
Ok(path_parts[1])
} else {
Err(RestError::SubzeroCore(NotFound {
target: request.uri().path().to_string(),
}))
}?;
// we always use the authenticator role to connect to the database
let autheticator_role = "authenticator";
let connection_string = format!(
"postgresql://{}@{}.local.neon.build/database", //FIXME: how do we get the database name knowing only the endpoint id?
autheticator_role, endpoint_id
"postgresql://{}@{}/{}",
autheticator_role, host, database_name
);
let conn_info = get_conn_info(
@@ -669,9 +672,11 @@ async fn handle_inner(
match conn_info.auth {
AuthData::Jwt(jwt) if config.authentication_config.is_auth_broker => {
let api_prefix = format!("/{}/rest/v1/", database_name);
handle_rest_inner(
config,
ctx,
&api_prefix,
request,
&connection_string,
conn_info.conn_info,
@@ -689,6 +694,7 @@ async fn handle_inner(
async fn handle_rest_inner(
config: &'static ProxyConfig,
ctx: &RequestContext,
api_prefix: &str,
request: Request<Incoming>,
connection_string: &str,
conn_info: ConnInfo,
@@ -711,8 +717,6 @@ async fn handle_rest_inner(
};
// hardcoded values for now, these should come from a config per tenant
let api_prefix = "/rest/v1/";
let endpoint_cache_key = conn_info.endpoint_cache_key().unwrap();
let mut client = backend.connect_to_local_proxy(ctx, conn_info).await?;
let (parts, originial_body) = request.into_parts();