diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index 1e608063d7..9e97350eec 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -5,9 +5,9 @@ edition = "2024" license.workspace = true [features] -default = ["subzero"] +default = ["rest_broker"] testing = ["dep:tokio-postgres"] -subzero = ["subzero-core", "jsonpath_lib", "ouroboros"] +rest_broker = ["subzero-core", "jsonpath_lib", "ouroboros"] [dependencies] ahash.workspace = true diff --git a/proxy/README.md b/proxy/README.md index 26cceb42f9..09091951b2 100644 --- a/proxy/README.md +++ b/proxy/README.md @@ -170,8 +170,8 @@ Create a configuration file called `local_proxy.json` in the root of the repo (u Start the local proxy: ```sh -cargo run --bin local_proxy -- \ - --disable_pg_session_jwt true \ +cargo run --bin local_proxy --features testing -- \ + --disable-pg-session-jwt \ --http 0.0.0.0:7432 ``` @@ -198,3 +198,9 @@ curl -k "https://foo.local.neon.build:8080/sql" \ -H "neon-connection-string: postgresql://authenticator@foo.local.neon.build/database" \ -d '{"query":"select 1","params":[]}' ``` + +Make a rest request against the auth broker (rest broker): +```sh +curl -k "https://foo.local.neon.build:8080/database/rest/v1/items?select=id,name&id=eq.1" \ +-H "Authorization: Bearer $NEON_JWT" +``` diff --git a/proxy/src/binary/local_proxy.rs b/proxy/src/binary/local_proxy.rs index e28d55ab57..d9524b9f85 100644 --- a/proxy/src/binary/local_proxy.rs +++ b/proxy/src/binary/local_proxy.rs @@ -22,9 +22,11 @@ use crate::auth::backend::jwt::JwkCache; use crate::auth::backend::local::LocalBackend; use crate::auth::{self}; use crate::cancellation::CancellationHandler; +#[cfg(feature = "rest_broker")] +use crate::config::RestConfig; use crate::config::refresh_config_loop; use crate::config::{ - self, AuthenticationConfig, ComputeConfig, HttpConfig, ProxyConfig, RestConfig, RetryConfig, + self, AuthenticationConfig, ComputeConfig, HttpConfig, ProxyConfig, RetryConfig, }; use crate::control_plane::locks::ApiLocks; use crate::http::health_server::AppMetrics; @@ -278,6 +280,7 @@ fn build_config(args: &LocalProxyCliArgs) -> anyhow::Result<&'static ProxyConfig accept_jwts: true, console_redirect_confirmation_timeout: Duration::ZERO, }, + #[cfg(feature = "rest_broker")] rest_config: RestConfig { is_rest_broker: false, db_schema_cache: None, diff --git a/proxy/src/binary/proxy.rs b/proxy/src/binary/proxy.rs index 8dc4731637..47868598ef 100644 --- a/proxy/src/binary/proxy.rs +++ b/proxy/src/binary/proxy.rs @@ -27,11 +27,13 @@ use crate::auth::backend::local::LocalBackend; use crate::auth::backend::{ConsoleRedirectBackend, MaybeOwned}; use crate::batch::BatchQueue; use crate::cancellation::{CancellationHandler, CancellationProcessor}; +#[cfg(feature = "rest_broker")] +use crate::config::RestConfig; #[cfg(any(test, feature = "testing"))] use crate::config::refresh_config_loop; use crate::config::{ self, AuthenticationConfig, CacheOptions, ComputeConfig, HttpConfig, ProjectInfoCacheOptions, - ProxyConfig, ProxyProtocolV2, RestConfig, remote_storage_from_toml, + ProxyConfig, ProxyProtocolV2, remote_storage_from_toml, }; use crate::context::parquet::ParquetUploadArgs; use crate::http::health_server::AppMetrics; @@ -43,6 +45,7 @@ use crate::redis::{elasticache, notifications}; use crate::scram::threadpool::ThreadPool; use crate::serverless::GlobalConnPoolOptions; use crate::serverless::cancel_set::CancelSet; +#[cfg(feature = "rest_broker")] use crate::serverless::rest::DbSchemaCache; use crate::tls::client_config::compute_client_config_with_root_certs; #[cfg(any(test, feature = "testing"))] @@ -245,10 +248,12 @@ struct ProxyCliArgs { /// if this is not local proxy, this toggles whether we accept Postgres REST requests #[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)] + #[cfg(feature = "rest_broker")] is_rest_broker: bool, /// cache for `db_schema_cache` introspection (use `size=0` to disable) #[clap(long, default_value = "size=1000,ttl=1h")] + #[cfg(feature = "rest_broker")] db_schema_cache: String, } @@ -517,6 +522,7 @@ pub async fn run() -> anyhow::Result<()> { maintenance_tasks.spawn(control_plane::mgmt::task_main(mgmt_listener)); // add a task to flush the db_schema cache every 10 minutes + #[cfg(feature = "rest_broker")] if let Some(db_schema_cache) = &config.rest_config.db_schema_cache { maintenance_tasks.spawn(async move { loop { @@ -696,22 +702,26 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> { timeout: Duration::from_secs(2), }; - let db_schema_cache_config: CacheOptions = args.db_schema_cache.parse()?; - info!("Using DbSchemaCache with options={db_schema_cache_config:?}"); + #[cfg(feature = "rest_broker")] + let rest_config = { + let db_schema_cache_config: CacheOptions = args.db_schema_cache.parse()?; + info!("Using DbSchemaCache with options={db_schema_cache_config:?}"); - let db_schema_cache = if args.is_rest_broker { - Some(DbSchemaCache::new( - "db_schema_cache", - db_schema_cache_config.size, - db_schema_cache_config.ttl, - true, - )) - } else { - None - }; - let rest_config = RestConfig { - is_rest_broker: args.is_rest_broker, - db_schema_cache, + let db_schema_cache = if args.is_rest_broker { + Some(DbSchemaCache::new( + "db_schema_cache", + db_schema_cache_config.size, + db_schema_cache_config.ttl, + true, + )) + } else { + None + }; + + RestConfig { + is_rest_broker: args.is_rest_broker, + db_schema_cache, + } }; let config = ProxyConfig { @@ -726,6 +736,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> { connect_to_compute: compute_config, #[cfg(feature = "testing")] disable_pg_session_jwt: false, + #[cfg(feature = "rest_broker")] rest_config, }; diff --git a/proxy/src/config.rs b/proxy/src/config.rs index 7e101e2331..630516a3e4 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -13,6 +13,7 @@ use crate::rate_limiter::{RateBucketInfo, RateLimitAlgorithm, RateLimiterConfig} use crate::scram::threadpool::ThreadPool; use crate::serverless::GlobalConnPoolOptions; use crate::serverless::cancel_set::CancelSet; +#[cfg(feature = "rest_broker")] use crate::serverless::rest::DbSchemaCache; pub use crate::tls::server_config::{TlsConfig, configure_tls}; use crate::types::Host; @@ -33,6 +34,7 @@ pub struct ProxyConfig { pub metric_collection: Option, pub http_config: HttpConfig, pub authentication_config: AuthenticationConfig, + #[cfg(feature = "rest_broker")] pub rest_config: RestConfig, pub proxy_protocol_v2: ProxyProtocolV2, pub handshake_timeout: Duration, @@ -84,6 +86,7 @@ pub struct AuthenticationConfig { pub console_redirect_confirmation_timeout: tokio::time::Duration, } +#[cfg(feature = "rest_broker")] pub struct RestConfig { pub is_rest_broker: bool, pub db_schema_cache: Option, diff --git a/proxy/src/serverless/mod.rs b/proxy/src/serverless/mod.rs index bdc4b52eb0..3e0f3c8662 100644 --- a/proxy/src/serverless/mod.rs +++ b/proxy/src/serverless/mod.rs @@ -11,6 +11,7 @@ mod http_conn_pool; mod http_util; mod json; mod local_conn_pool; +#[cfg(feature = "rest_broker")] pub mod rest; mod sql_over_http; mod websocket; @@ -487,25 +488,38 @@ async fn request_handler( .status(StatusCode::OK) // 204 is also valid, but see: https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods/OPTIONS#status_code .body(Empty::new().map_err(|x| match x {}).boxed()) .map_err(|e| ApiError::InternalServerError(e.into())) - } else if config.rest_config.is_rest_broker && request.uri().path().starts_with("/rest") { - let ctx = RequestContext::new(session_id, conn_info, crate::metrics::Protocol::Http); - let span = ctx.span(); - - let testodrome_id = request - .headers() - .get("X-Neon-Query-ID") - .and_then(|value| value.to_str().ok()) - .map(|s| s.to_string()); - - if let Some(query_id) = testodrome_id { - info!(parent: &ctx.span(), "testodrome query ID: {query_id}"); - ctx.set_testodrome_id(query_id.into()); - } - - rest::handle(config, ctx, request, backend, http_cancellation_token) - .instrument(span) - .await } else { - json_response(StatusCode::BAD_REQUEST, "query is not supported") + #[cfg(feature = "rest_broker")] + { + if config.rest_config.is_rest_broker && { + let path_parts: Vec<&str> = request.uri().path().split('/').collect(); + path_parts.len() >= 3 && path_parts[2].starts_with("rest") + } { + let ctx = + RequestContext::new(session_id, conn_info, crate::metrics::Protocol::Http); + let span = ctx.span(); + + let testodrome_id = request + .headers() + .get("X-Neon-Query-ID") + .and_then(|value| value.to_str().ok()) + .map(|s| s.to_string()); + + if let Some(query_id) = testodrome_id { + info!(parent: &ctx.span(), "testodrome query ID: {query_id}"); + ctx.set_testodrome_id(query_id.into()); + } + + rest::handle(config, ctx, request, backend, http_cancellation_token) + .instrument(span) + .await + } else { + json_response(StatusCode::BAD_REQUEST, "query is not supported") + } + } + #[cfg(not(feature = "rest_broker"))] + { + json_response(StatusCode::BAD_REQUEST, "query is not supported") + } } } diff --git a/proxy/src/serverless/rest.rs b/proxy/src/serverless/rest.rs index d8aece6710..b3207976f3 100644 --- a/proxy/src/serverless/rest.rs +++ b/proxy/src/serverless/rest.rs @@ -55,7 +55,7 @@ use subzero_core::{ }, parser::postgrest::parse, permissions::check_safe_functions, - schema::{DbSchema, POSTGRESQL_CONFIGURATION_SQL, POSTGRESQL_INTROSPECTION_SQL}, + schema::{DbSchema, POSTGRESQL_INTROSPECTION_SQL, get_postgresql_configuration_query}, }; use tokio_util::sync::CancellationToken; use tracing::{error, info}; @@ -66,7 +66,6 @@ static MAX_SCHEMA_SIZE: usize = 1024 * 1024 * 5; // 5MB static MAX_HTTP_BODY_SIZE: usize = 10 * 1024 * 1024; // 10MB limit static EMPTY_JSON_SCHEMA: &str = r#"{"schemas":[]}"#; const INTROSPECTION_SQL: &str = POSTGRESQL_INTROSPECTION_SQL; -const CONFIGURATION_SQL: &str = POSTGRESQL_CONFIGURATION_SQL; // A wrapper around the DbSchema that allows for self-referencing #[self_referencing] @@ -174,7 +173,8 @@ impl DbSchemaCache { (&RAW_TEXT_OUTPUT, HeaderValue::from_str("true").unwrap()), ]; - let body = serde_json::json!({"query": CONFIGURATION_SQL}); + let query = get_postgresql_configuration_query(Some("pgrst.pre_config")); + let body = serde_json::json!({"query": query}); let (response_status, mut response_json) = make_local_proxy_request(client, headers, body).await?; @@ -641,19 +641,22 @@ async fn handle_inner( "handling interactive connection from client" ); - let endpoint_id = request - .uri() - .host() - .unwrap_or("") - .split('.') - .next() - .unwrap_or(""); + let host = request.uri().host().unwrap_or(""); + let path_parts: Vec<&str> = request.uri().path().split('/').collect(); + // a valid path is /database/rest/v1/... so parts should be ["", "database", "rest", "v1", ...] + let database_name = if path_parts.len() >= 3 && path_parts[1].len() > 0 { + Ok(path_parts[1]) + } else { + Err(RestError::SubzeroCore(NotFound { + target: request.uri().path().to_string(), + })) + }?; // we always use the authenticator role to connect to the database let autheticator_role = "authenticator"; let connection_string = format!( - "postgresql://{}@{}.local.neon.build/database", //FIXME: how do we get the database name knowing only the endpoint id? - autheticator_role, endpoint_id + "postgresql://{}@{}/{}", + autheticator_role, host, database_name ); let conn_info = get_conn_info( @@ -669,9 +672,11 @@ async fn handle_inner( match conn_info.auth { AuthData::Jwt(jwt) if config.authentication_config.is_auth_broker => { + let api_prefix = format!("/{}/rest/v1/", database_name); handle_rest_inner( config, ctx, + &api_prefix, request, &connection_string, conn_info.conn_info, @@ -689,6 +694,7 @@ async fn handle_inner( async fn handle_rest_inner( config: &'static ProxyConfig, ctx: &RequestContext, + api_prefix: &str, request: Request, connection_string: &str, conn_info: ConnInfo, @@ -711,8 +717,6 @@ async fn handle_rest_inner( }; // hardcoded values for now, these should come from a config per tenant - let api_prefix = "/rest/v1/"; - let endpoint_cache_key = conn_info.endpoint_cache_key().unwrap(); let mut client = backend.connect_to_local_proxy(ctx, conn_info).await?; let (parts, originial_body) = request.into_parts();