diff --git a/Cargo.lock b/Cargo.lock index e6b8399b5e..50a8cac8a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -971,7 +971,7 @@ dependencies = [ "bitflags 2.8.0", "cexpr", "clang-sys", - "itertools 0.12.1", + "itertools 0.13.0", "log", "prettyplease", "proc-macro2", @@ -1658,6 +1658,27 @@ dependencies = [ "typenum", ] +[[package]] +name = "csv" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acdc4883a9c96732e4733212c01447ebd805833b7275a73ca3ee080fd77afdaf" +dependencies = [ + "csv-core", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "csv-core" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d02f3b0da4c6504f86e9cd789d8dbafab48c2321be74e9987593de5a894d93d" +dependencies = [ + "memchr", +] + [[package]] name = "curve25519-dalek" version = "4.1.3" @@ -3349,6 +3370,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.11" @@ -3461,6 +3491,17 @@ dependencies = [ "serde_json", ] +[[package]] +name = "jsonpath_lib" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eaa63191d68230cccb81c5aa23abd53ed64d83337cacbb25a7b8c7979523774f" +dependencies = [ + "log", + "serde", + "serde_json", +] + [[package]] name = "jsonwebtoken" version = "9.2.0" @@ -4607,11 +4648,11 @@ dependencies = [ [[package]] name = "pem" -version = "3.0.3" +version = "3.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b8fcc794035347fb64beda2d3b462595dd2753e3f268d89c5aae77e8cf2c310" +checksum = "38af38e8470ac9dee3ce1bae1af9c1671fffc44ddfd8bd1d0a3445bf349a8ef3" dependencies = [ - "base64 0.21.7", + "base64 0.22.1", "serde", ] @@ -5116,7 +5157,7 @@ checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15" dependencies = [ "bytes", "heck", - "itertools 0.12.1", + "itertools 0.13.0", "log", "multimap", "once_cell", @@ -5149,7 +5190,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" dependencies = [ "anyhow", - "itertools 0.12.1", + "itertools 0.13.0", "proc-macro2", "quote", "syn 2.0.100", @@ -5224,6 +5265,7 @@ dependencies = [ "itoa", "jose-jwa", "jose-jwk", + "jsonpath_lib", "lasso", "measured", "metrics", @@ -5265,6 +5307,7 @@ dependencies = [ "socket2", "strum_macros", "subtle", + "subzero-core", "thiserror 1.0.69", "tikv-jemalloc-ctl", "tikv-jemallocator", @@ -5535,14 +5578,14 @@ dependencies = [ [[package]] name = "regex" -version = "1.10.2" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343" +checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.4.3", - "regex-syntax 0.8.2", + "regex-automata 0.4.9", + "regex-syntax 0.8.5", ] [[package]] @@ -5556,13 +5599,13 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.3" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f" +checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" dependencies = [ "aho-corasick", "memchr", - "regex-syntax 0.8.2", + "regex-syntax 0.8.5", ] [[package]] @@ -5579,9 +5622,9 @@ checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" [[package]] name = "regex-syntax" -version = "0.8.2" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" +checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" [[package]] name = "relative-path" @@ -6441,6 +6484,7 @@ version = "1.0.125" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "83c8e735a073ccf5be70aa8066aa984eaf2fa000db6c8d0100ae605b366d31ed" dependencies = [ + "indexmap 2.9.0", "itoa", "memchr", "ryu", @@ -6663,6 +6707,27 @@ dependencies = [ "serde", ] +[[package]] +name = "snafu" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "320b01e011bf8d5d7a4a4a4be966d9160968935849c83b918827f6a435e7f627" +dependencies = [ + "snafu-derive", +] + +[[package]] +name = "snafu-derive" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1961e2ef424c1424204d3a5d6975f934f56b6d50ff5732382d84ebf460e147f7" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "socket2" version = "0.5.5" @@ -6930,6 +6995,25 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" +[[package]] +name = "subzero-core" +version = "3.0.1" +dependencies = [ + "base64 0.22.1", + "csv", + "getrandom 0.2.11", + "itertools 0.13.0", + "lazy_static", + "log", + "nom", + "pem", + "regex", + "serde", + "serde_json", + "snafu", + "yasna", +] + [[package]] name = "svg_fmt" version = "0.4.3" @@ -8640,8 +8724,8 @@ dependencies = [ "quote", "rand 0.8.5", "regex", - "regex-automata 0.4.3", - "regex-syntax 0.8.2", + "regex-automata 0.4.9", + "regex-syntax 0.8.5", "reqwest", "rustls 0.23.27", "rustls-pki-types", diff --git a/Cargo.toml b/Cargo.toml index 9110b483a9..597d607393 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -203,7 +203,8 @@ tonic = { version = "0.13.1", default-features = false, features = ["channel", " tonic-reflection = { version = "0.13.1", features = ["server"] } tower = { version = "0.5.2", default-features = false } tower-http = { version = "0.6.2", features = ["auth", "request-id", "trace"] } -subzero-core = { version = "3.0.1", path = "../subzero/core" } +subzero-core = { version = "3.0.1", path = "../subzero/core", features = ["postgresql"] } +jsonpath_lib = "0.3.0" # This revision uses opentelemetry 0.27. There's no tag for it. tower-otel = { git = "https://github.com/mattiapenati/tower-otel", rev = "56a7321053bcb72443888257b622ba0d43a11fcd" } diff --git a/local_proxy.json b/local_proxy.json index 28591897c6..0b77498af1 100644 --- a/local_proxy.json +++ b/local_proxy.json @@ -2,7 +2,7 @@ "jwks": [ { "id": "1", - "role_names": ["authenticated"], + "role_names": ["authenticator", "authenticated", "anon"], "jwks_url": "https://climbing-minnow-11.clerk.accounts.dev/.well-known/jwks.json", "provider_name": "foo", "jwt_audience": null diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index ce8610be24..8568472162 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -103,6 +103,8 @@ uuid.workspace = true x509-cert.workspace = true redis.workspace = true zerocopy.workspace = true +subzero-core.workspace = true +jsonpath_lib.workspace = true # jwt stuff jose-jwa = "0.1.2" diff --git a/proxy/src/control_plane/client/cplane_proxy_v1.rs b/proxy/src/control_plane/client/cplane_proxy_v1.rs index 78e482410b..3472e9ce0a 100644 --- a/proxy/src/control_plane/client/cplane_proxy_v1.rs +++ b/proxy/src/control_plane/client/cplane_proxy_v1.rs @@ -396,7 +396,7 @@ impl super::ControlPlaneApi for NeonControlPlaneClient { .parse() .expect("url is valid"), audience: None, - role_names: vec![(&RoleName::from("authenticated")).into()], + role_names: vec![(&RoleName::from("authenticator")).into(), (&RoleName::from("authenticated")).into(), (&RoleName::from("anon")).into()], }]); } diff --git a/proxy/src/control_plane/client/mod.rs b/proxy/src/control_plane/client/mod.rs index 4e5f5c7899..e5dbf6c851 100644 --- a/proxy/src/control_plane/client/mod.rs +++ b/proxy/src/control_plane/client/mod.rs @@ -229,7 +229,7 @@ impl ApiLocks { // temporary lock a single shard and then clear any semaphores that aren't currently checked out // race conditions: if strong_count == 1, there's no way that it can increase while the shard is locked // therefore releasing it is safe from race conditions - info!( + debug!( //FIXME: is anything depending on this being info? name = self.name, shard = i, "performing epoch reclamation on api lock" diff --git a/proxy/src/serverless/backend.rs b/proxy/src/serverless/backend.rs index e5a309f552..f16ed2f37e 100644 --- a/proxy/src/serverless/backend.rs +++ b/proxy/src/serverless/backend.rs @@ -115,7 +115,7 @@ impl PoolingBackend { match &self.auth_backend { crate::auth::Backend::ControlPlane(console, ()) => { - self.config + let keys = self.config .authentication_config .jwks_cache .check_jwt( @@ -129,7 +129,9 @@ impl PoolingBackend { Ok(ComputeCredentials { info: user_info.clone(), - keys: crate::auth::backend::ComputeCredentialKeys::None, + // FIXME: why was this set to None? + //keys: crate::auth::backend::ComputeCredentialKeys::None, + keys, }) } crate::auth::Backend::Local(_) => { diff --git a/proxy/src/serverless/rest.rs b/proxy/src/serverless/rest.rs index 99c7b3173f..f453e44ae9 100644 --- a/proxy/src/serverless/rest.rs +++ b/proxy/src/serverless/rest.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use std::sync::Arc; use bytes::Bytes; @@ -14,7 +15,7 @@ use indexmap::IndexMap; use postgres_client::error::{DbError, ErrorPosition, SqlState}; -use serde_json::value::RawValue; +use serde_json::{value::RawValue, Value as JsonValue}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info}; @@ -28,8 +29,8 @@ use super::conn_pool_lib::{ConnInfo}; use super::error::HttpCodeError; use super::http_util::json_response; use super::json::{JsonConversionError}; -use crate::auth::backend::{ComputeUserInfo}; -use crate::auth::{ComputeUserInfoParseError, endpoint_sni}; +use crate::auth::backend::{ComputeUserInfo, ComputeCredentialKeys}; +use crate::auth::{ComputeUserInfoParseError, endpoint_sni, }; use crate::config::{AuthenticationConfig, ProxyConfig, TlsConfig}; use crate::context::RequestContext; use crate::error::{ErrorKind, ReportableError, UserFacingError}; @@ -40,6 +41,30 @@ use crate::proxy::NeonOptions; use crate::serverless::backend::HttpConnError; use crate::types::{DbName, RoleName}; +use subzero_core::{ + api::{ApiRequest, ApiResponse, ContentType::*, SingleVal, ListVal, Payload}, + error::Error::{self as SubzeroCoreError, SingularityError, PutMatchingPkError, PermissionDenied, JsonDeserialize, NotFound, JwtTokenInvalid,}, + schema::DbSchema, + formatter::{ + Param, + Param::*, + postgresql::{fmt_main_query, generate}, + ToParam, Snippet, SqlParam, + }, + error::JsonDeserializeSnafu, + dynamic_statement::{param, sql, JoinIterator}, +}; +use subzero_core::{ + api::{ContentType, ContentType::*, Preferences, QueryNode::*, Representation, Resolution::*,}, + error::{*}, + parser::postgrest::parse, + permissions::{check_safe_functions, check_privileges, insert_policy_conditions, replace_select_star}, + api::DEFAULT_SAFE_SELECT_FUNCTIONS, +}; +use std::collections::HashMap; +use jsonpath_lib::select; +use url::form_urlencoded; + @@ -55,6 +80,9 @@ static TXN_DEFERRABLE: HeaderName = HeaderName::from_static("neon-batch-deferrab static HEADER_VALUE_TRUE: HeaderValue = HeaderValue::from_static("true"); +// FIXME: remove this header +static HACK_TRUST_ROLE_SWITCHING: HeaderName = HeaderName::from_static("neon-hack-trust-role-switching"); + #[derive(Debug, thiserror::Error)] @@ -416,6 +444,8 @@ pub(crate) enum RestError { JsonConversion(#[from] JsonConversionError), #[error("{0}")] Cancelled(SqlOverHttpCancel), + #[error("{0}")] + SubzeroCore(#[source] SubzeroCoreError), } impl ReportableError for RestError { @@ -436,6 +466,7 @@ impl ReportableError for RestError { } RestError::JsonConversion(_) => ErrorKind::Postgres, RestError::Cancelled(c) => c.get_error_kind(), + RestError::SubzeroCore(s) => ErrorKind::User, } } } @@ -452,6 +483,18 @@ impl UserFacingError for RestError { RestError::InternalPostgres(p) => p.to_string(), RestError::JsonConversion(_) => "could not parse postgres response".to_string(), RestError::Cancelled(_) => self.to_string(), + RestError::SubzeroCore(s) => { + // TODO: this is a hack to get the message from the json body + let json = s.json_body(); + let default_message = "Unknown error".to_string(); + let message = json.get("message").map_or(default_message.clone(), |m| + match m { + JsonValue::String(s) => s.clone(), + _ => default_message, + } + ); + message + } } } } @@ -471,6 +514,10 @@ impl HttpCodeError for RestError { RestError::InternalPostgres(_) => StatusCode::INTERNAL_SERVER_ERROR, RestError::JsonConversion(_) => StatusCode::INTERNAL_SERVER_ERROR, RestError::Cancelled(_) => StatusCode::INTERNAL_SERVER_ERROR, + RestError::SubzeroCore(e) => { + let status = e.status_code(); + StatusCode::from_u16(status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR) + } } } } @@ -550,9 +597,10 @@ async fn handle_inner( let host = request.uri().host().unwrap_or("").split('.').next().unwrap_or(""); - let connection_string = format!("postgresql://authenticated@{}.local.neon.build/database", host); - - + + // we always use the authenticator role to connect to the database + let autheticator_role = "authenticator"; + let connection_string = format!("postgresql://{}@{}.local.neon.build/database", autheticator_role, host); let conn_info = get_conn_info( &config.authentication_config, @@ -595,6 +643,125 @@ static HEADERS_TO_STRIP: &[&HeaderName] = &[ &TXN_READ_ONLY, &TXN_DEFERRABLE, ]; + +static JSON_SCHEMA: &str = r#" + { + "schemas":[ + { + "name":"test", + "objects":[ + { + "kind":"table", + "name":"items", + "columns":[ + { + "name":"id", + "data_type":"integer", + "primary_key":true + }, + { + "name":"name", + "data_type":"text" + } + ], + "foreign_keys":[], + "permissions":[] + } + ] + } + ] + } +"#; + +pub fn fmt_env_query<'a>(env: &'a HashMap<&'a str, &'a str>) -> Snippet<'a> { + "select " + + if env.is_empty() { + sql("null") + } else { + env.iter() + .map(|(k, v)| "set_config(" + param(k as &SqlParam) + ", " + param(v as &SqlParam) + ", true)") + .join(",") + } +} + +fn current_schema(db_schemas: &Vec, method: &Method, headers: &HeaderMap) -> Result { + match (db_schemas.len() > 1, method, headers.get("accept-profile"), headers.get("content-profile")) { + (false, ..) => Ok(db_schemas.first().unwrap_or(&"_inexistent_".to_string()).clone()), + (_, &Method::DELETE, _, Some(content_profile_header)) + | (_, &Method::POST, _, Some(content_profile_header)) + | (_, &Method::PATCH, _, Some(content_profile_header)) + | (_, &Method::PUT, _, Some(content_profile_header)) => { + match content_profile_header.to_str() { + Ok(content_profile_str) => { + let content_profile = String::from(content_profile_str); + if db_schemas.contains(&content_profile) { + Ok(content_profile) + } else { + Err(SubzeroCoreError::UnacceptableSchema { + schemas: db_schemas.clone(), + }) + } + } + Err(_) => Err(SubzeroCoreError::UnacceptableSchema { + schemas: db_schemas.clone(), + }) + } + } + (_, _, Some(accept_profile_header), _) => { + match accept_profile_header.to_str() { + Ok(accept_profile_str) => { + let accept_profile = String::from(accept_profile_str); + if db_schemas.contains(&accept_profile) { + Ok(accept_profile) + } else { + Err(SubzeroCoreError::UnacceptableSchema { + schemas: db_schemas.clone(), + }) + } + } + Err(_) => Err(SubzeroCoreError::UnacceptableSchema { + schemas: db_schemas.clone(), + }) + } + } + _ => Ok(db_schemas.first().unwrap_or(&"_inexistent_".to_string()).clone()), + } +} +pub fn to_core_error(e: SubzeroCoreError) -> RestError { + RestError::SubzeroCore(e) +} + + +fn to_sql_param(p: &Param) -> JsonValue { + match p { + SV(SingleVal(v, ..)) => { + JsonValue::String(v.to_string()) + } + Str(v) => { + JsonValue::String(v.to_string()) + } + StrOwned(v) => { + JsonValue::String((*v).clone()) + } + PL(Payload(v, ..)) => { + JsonValue::String(v.clone().into_owned()) + } + LV(ListVal(v, ..)) => { + if !v.is_empty() { + JsonValue::String(format!( + "{{\"{}\"}}", + v.iter() + .map(|e| e.replace('\\', "\\\\").replace('\"', "\\\"")) + .collect::>() + .join("\",\"") + )) + } else { + JsonValue::String(r#"{}"#.to_string()) + } + } + } +} + async fn handle_rest_inner( ctx: &RequestContext, request: Request, @@ -603,16 +770,200 @@ async fn handle_rest_inner( jwt: String, backend: Arc, ) -> Result>, RestError> { - backend - .authenticate_with_jwt(ctx, &conn_info.user_info, jwt) + let mut response_headers = vec![]; + // hardcoded values for now + let max_http_body_size = 10 * 1024 * 1024; // 10MB limit + + let db_schemas = Vec::from(["test".to_string()]); // list of schemas available for the api + let mut db_schema_parsed = serde_json::from_str::(JSON_SCHEMA) // database schema shape (will come from introspection) + .map_err(|e| RestError::SubzeroCore(JsonDeserialize {source:e }))?; + let disable_internal_permissions = true; // in the context of neon we emulate postgrest (so no internal permissions checks) + db_schema_parsed.use_internal_permissions = false; // TODO: change the introspection query to auto set this to false depending on params + let db_schema = &db_schema_parsed; + let api_prefix = "/rest/v1/"; + let db_extra_search_path = "public, extensions".to_string(); + let role_claim_key = ".role".to_string(); + let role_claim_path = format!("${}", role_claim_key); + println!("role_claim_path: {:?}", role_claim_path); + let db_anon_role = Some("anon".to_string()); + //let max_rows = Some("1000".to_string()); + let max_rows = None; + let db_allowed_select_functions = DEFAULT_SAFE_SELECT_FUNCTIONS.iter().map(|m| *m).collect::>(); + + let jwt_parsed = backend + .authenticate_with_jwt(ctx, &conn_info.user_info, jwt.clone()) //TODO: do not clone jwt .await .map_err(HttpConnError::from)?; + let jwt_claims = match jwt_parsed.keys { + ComputeCredentialKeys::JwtPayload(payload_bytes) => { + // `payload_bytes` contains the raw JWT payload as Vec + // You can deserialize it back to JSON or parse specific claims + let payload: serde_json::Value = serde_json::from_slice(&payload_bytes) + .map_err(|e| RestError::SubzeroCore(JsonDeserialize {source:e }))?; + Some(payload) + }, + _ => { + None + } + }; + println!("jwt_payload: {:?}", &jwt_claims); + let (role, authenticated) = match &jwt_claims { + Some(claims) => match select(claims, &role_claim_path) { + Ok(v) => match &v[..] { + [JsonValue::String(s)] => Ok((Some(s), true)), + _ => Ok((db_anon_role.as_ref(), true)), + }, + Err(e) => Err(RestError::SubzeroCore(JwtTokenInvalid { message: format!("{e}") })), + }, + None => Ok((db_anon_role.as_ref(), false)), + }?; + println!("role: {:?}", role); + println!("authenticated: {:?}", authenticated); + // do not allow unauthenticated requests when there is no anonymous role setup + if let (None, false) = (role, authenticated) { + return Err(RestError::SubzeroCore(JwtTokenInvalid { + message: "unauthenticated requests not allowed".to_string(), + })); + } + // println!("jwt: {:?}", jwt.keys); + + let role = match role { + Some(r) => r, + None => "", + }; + + + let (parts, originial_body) = request.into_parts(); + let method = parts.method.to_string(); + let path = parts.uri.path_and_query().map(|pq| pq.as_str()).unwrap_or("/"); + + // this is actually the table name (or rpc/function_name) + // TODO: rename this to something more descriptive + let root = match parts.uri.path().strip_prefix(api_prefix) { + Some(p) => Ok(p), + None => Err(RestError::SubzeroCore(NotFound { + target: parts.uri.path().to_string(), + })), + }?; + + let schema_name = ¤t_schema(&db_schemas, &parts.method, &parts.headers).map_err(RestError::SubzeroCore)?; + if db_schemas.len() > 1 { + response_headers.push(("Content-Profile".to_string(), schema_name.clone())); + } + + let body = Full::new(Bytes::new()).map_err(|never| match never {}).boxed(); + + + // print all the local variables + println!("schema_name: {:?}", schema_name); + println!("db_schemas: {:?}", db_schemas); + println!("db_schema: {:?}", db_schema); + println!("root: {:?}", root); + println!("method: {:?}", method); + println!("path: {:?}", path); + println!("response_headers: {:?}", response_headers); + println!("originial_body: {:?}", originial_body); + //println!("parts: {:?}", parts); + + println!("conn_info: {:?}", conn_info); + println!("jwt: {:?}", jwt); + + + let query = match parts.uri.query() { + Some(q) => form_urlencoded::parse(q.as_bytes()).collect(), + None => vec![], + }; + let get: Vec<(&str, &str)> = query.iter().map(|(k, v)| (&**k, &**v)).collect(); + + let headers_map = parts.headers; + let headers: HashMap<&str, &str> = headers_map.iter() + .map(|(k, v)| (k.as_str(), v.to_str().unwrap_or("__BAD_HEADER__"))) + .collect(); + + let cookies = HashMap::new(); // TODO: add cookies + + // Read the request body + let body_bytes = read_body_with_limit(originial_body, max_http_body_size).await.map_err(ReadPayloadError::from)?; // 10MB limit + let body_as_string: Option = if body_bytes.is_empty() { + None + } else { + Some(String::from_utf8_lossy(&body_bytes).into_owned()) + }; + + println!("ready to parse!!!!!!!"); + let mut api_request = parse(schema_name, root, db_schema, method.as_str(), path, get, body_as_string.as_deref(), headers, cookies, max_rows).map_err(RestError::SubzeroCore)?; + + + + // in case when the role is not set (but authenticated through jwt) the query will be executed with the privileges + // of the "authenticator" role unless the DbSchema has internal privileges set + + // replace "*" with the list of columns the user has access to + // so that he does not encounter permission errors + // replace_select_star(db_schema, schema_name, role, &mut api_request.query).map_err(to_core_error)?; + println!("after replace_select_star !!!!!!!"); + + if !disable_internal_permissions { + // check_privileges(db_schema, schema_name, role, &api_request).map_err(to_core_error)?; + println!("after check_privileges !!!!!!!"); + } + println!("after check_privileges 2 !!!!!!!"); + check_safe_functions(&api_request, &db_allowed_select_functions).map_err(to_core_error)?; + println!("after check_safe_functions !!!!!!!"); + if !disable_internal_permissions { + insert_policy_conditions(db_schema, schema_name, role, &mut api_request.query).map_err(to_core_error)?; + println!("after insert_policy_conditions !!!!!!!"); + } + + println!("api_request after checks: {:?}", api_request); + + // when using internal privileges not switch "current_role" + // TODO: why do we need this? + let env_role = if !disable_internal_permissions && db_schema.use_internal_permissions { + None + } else { + Some(role) + }; + + let empty_json = "{}".to_string(); + let headers_env = serde_json::to_string(&api_request.headers).unwrap_or(empty_json.clone()); + let cookies_env = serde_json::to_string(&api_request.cookies).unwrap_or(empty_json.clone()); + let get_env = serde_json::to_string(&api_request.get).unwrap_or(empty_json.clone()); + let jwt_claims_env = jwt_claims.as_ref().map(|v| serde_json::to_string(v).unwrap_or(empty_json.clone())) + .unwrap_or( + if let Some(r) = env_role { + let claims: HashMap<&str, &str> = HashMap::from([("role", r)]); + serde_json::to_string(&claims).unwrap_or(empty_json.clone()) + } else { + empty_json.clone() + } + ); + let mut env: HashMap<&str, &str> = HashMap::from([ + ("request.method", api_request.method), + ("request.path", api_request.path), + ("search_path", &db_extra_search_path), + ("request.headers", &headers_env), + ("request.cookies", &cookies_env), + ("request.get", &get_env), + ("request.jwt.claims", &jwt_claims_env), + ]); + if let Some(r) = env_role { + env.insert("role", r.into()); + } + let (env_statement, env_parameters, _) = generate(fmt_env_query(&env)); + let (main_statement, main_parameters, _) = generate(fmt_main_query(db_schema, api_request.schema_name, &api_request, &env).map_err(to_core_error)?); + + println!("env_statement: {:?} \n env_parameters: {:?}", env_statement, env_parameters); + println!("main_statement: {:?} \n main_parameters: {:?}", main_statement, main_parameters); + + + // now we are ready to send the request to the local proxy let mut client = backend.connect_to_local_proxy(ctx, conn_info).await?; let local_proxy_uri = ::http::Uri::from_static("http://proxy.local/sql"); - let (parts, _originial_body) = request.into_parts(); + let mut req = Request::builder().method(Method::POST).uri(local_proxy_uri); // todo(conradludgate): maybe auth-broker should parse these and re-serialize @@ -623,17 +974,33 @@ async fn handle_rest_inner( // } // } // forward all headers except the ones in HEADERS_TO_STRIP - for (h, v) in parts.headers.iter() { + for (h, v) in headers_map.iter() { if !HEADERS_TO_STRIP.contains(&h) { req = req.header(h, v); } } req = req.header(&NEON_REQUEST_ID, uuid_to_header_value(ctx.session_id())); req = req.header(&CONN_STRING, HeaderValue::from_str(connection_string).unwrap()); + + // FIXME: remove this header + req = req.header(&HACK_TRUST_ROLE_SWITCHING, HeaderValue::from_static("true")); + + + + let env_parameters_json = env_parameters.iter().map(|p| to_sql_param(&p.to_param())).collect::>(); + let main_parameters_json = main_parameters.iter().map(|p| to_sql_param(&p.to_param())).collect::>(); let body: String = json!({ - "query": "select 1 as one", - "params": [], + "queries": [ + { + "query": env_statement, + "params": env_parameters_json, + }, + { + "query": main_statement, + "params": main_parameters_json, + } + ] }).to_string(); let body_boxed = Full::new(Bytes::from(body)) @@ -656,6 +1023,12 @@ async fn handle_rest_inner( .map_err(LocalProxyConnError::from) .map_err(HttpConnError::from)? .map(|b| b.boxed())) + + // Ok(Response::builder() + // .status(StatusCode::OK) + // .body(body) + // .unwrap()) + } diff --git a/proxy/src/serverless/sql_over_http.rs b/proxy/src/serverless/sql_over_http.rs index f88263944a..de99486413 100644 --- a/proxy/src/serverless/sql_over_http.rs +++ b/proxy/src/serverless/sql_over_http.rs @@ -869,7 +869,7 @@ async fn handle_auth_broker_inner( req = req.header(&NEON_REQUEST_ID, uuid_to_header_value(ctx.session_id())); let req = req - .body(body.map_err(|e| e).boxed()) + .body(body.map_err(|e| e).boxed()) //TODO: is there a potential for a regression here? .expect("all headers and params received via hyper should be valid for request"); // todo: map body to count egress diff --git a/proxy/subzero/sql/include/test.sql b/proxy/subzero/sql/include/test.sql index 056d7bad9f..7bf3781505 100644 --- a/proxy/subzero/sql/include/test.sql +++ b/proxy/subzero/sql/include/test.sql @@ -10,8 +10,8 @@ INSERT INTO test.items (name) VALUES ('test item 2'), ('test item 3'); -CREATE ROLE test_role NOLOGIN; -GRANT test_role TO authenticator; +CREATE ROLE authenticated NOLOGIN; +GRANT authenticated TO authenticator; -GRANT USAGE ON SCHEMA test TO test_role; -GRANT ALL ON ALL TABLES IN SCHEMA test TO test_role; \ No newline at end of file +GRANT USAGE ON SCHEMA test TO authenticated; +GRANT ALL ON ALL TABLES IN SCHEMA test TO authenticated; \ No newline at end of file