subzero integration WIP6

beginning work on introspection of config and schema shape from the database
This commit is contained in:
Ruslan Talpa
2025-06-26 14:42:31 +03:00
parent fbb2416685
commit d6c36d103e
6 changed files with 288 additions and 16 deletions

70
Cargo.lock generated
View File

@@ -52,6 +52,12 @@ dependencies = [
"memchr",
]
[[package]]
name = "aliasable"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "250f629c0161ad8107cf89319e990051fae62832fd343083bea452d93e2205fd"
[[package]]
name = "aligned-vec"
version = "0.6.1"
@@ -1209,7 +1215,7 @@ version = "4.5.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ac6a0c7b1a9e9a5186361f67dfa1b88213572f427fb9ab038efb2bd8c582dab"
dependencies = [
"heck",
"heck 0.5.0",
"proc-macro2",
"quote",
"syn 2.0.100",
@@ -1956,7 +1962,7 @@ checksum = "0892a17df262a24294c382f0d5997571006e7a4348b4327557c4ff1cd4a8bccc"
dependencies = [
"darling",
"either",
"heck",
"heck 0.5.0",
"proc-macro2",
"quote",
"syn 2.0.100",
@@ -2733,6 +2739,12 @@ dependencies = [
"http 1.1.0",
]
[[package]]
name = "heck"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
[[package]]
name = "heck"
version = "0.5.0"
@@ -3689,7 +3701,7 @@ version = "0.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9e6777fc80a575f9503d908c8b498782a6c3ee88a06cb416dc3941401e43b94"
dependencies = [
"heck",
"heck 0.5.0",
"proc-macro2",
"quote",
"syn 2.0.100",
@@ -4227,6 +4239,30 @@ dependencies = [
"winapi",
]
[[package]]
name = "ouroboros"
version = "0.18.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e0f050db9c44b97a94723127e6be766ac5c340c48f2c4bb3ffa11713744be59"
dependencies = [
"aliasable",
"ouroboros_macro",
"static_assertions",
]
[[package]]
name = "ouroboros_macro"
version = "0.18.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c7028bdd3d43083f6d8d4d5187680d0d3560d54df4cc9d752005268b41e64d0"
dependencies = [
"heck 0.4.1",
"proc-macro2",
"proc-macro2-diagnostics",
"quote",
"syn 2.0.100",
]
[[package]]
name = "outref"
version = "0.5.1"
@@ -5095,6 +5131,19 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "proc-macro2-diagnostics"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af066a9c399a26e020ada66a034357a868728e72cd426f3adcd35f80d88d88c8"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.100",
"version_check",
"yansi",
]
[[package]]
name = "procfs"
version = "0.16.0"
@@ -5164,7 +5213,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4"
dependencies = [
"bytes",
"heck",
"heck 0.5.0",
"itertools 0.12.1",
"log",
"multimap",
@@ -5185,7 +5234,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15"
dependencies = [
"bytes",
"heck",
"heck 0.5.0",
"itertools 0.13.0",
"log",
"multimap",
@@ -6764,7 +6813,7 @@ version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1961e2ef424c1424204d3a5d6975f934f56b6d50ff5732382d84ebf460e147f7"
dependencies = [
"heck",
"heck 0.5.0",
"proc-macro2",
"quote",
"syn 2.0.100",
@@ -7026,7 +7075,7 @@ version = "0.26.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be"
dependencies = [
"heck",
"heck 0.5.0",
"proc-macro2",
"quote",
"rustversion",
@@ -7050,6 +7099,7 @@ dependencies = [
"lazy_static",
"log",
"nom",
"ouroboros",
"pem",
"regex",
"serde",
@@ -8869,6 +8919,12 @@ version = "0.13.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d25c75bf9ea12c4040a97f829154768bbbce366287e2dc044af160cd79a13fd"
[[package]]
name = "yansi"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cfe53a6657fd280eaa890a3bc59152892ffa3e30101319d168b781ed6529b049"
[[package]]
name = "yasna"
version = "0.5.2"

View File

@@ -284,6 +284,7 @@ fn build_config(args: &LocalProxyCliArgs) -> anyhow::Result<&'static ProxyConfig
},
rest_config: RestConfig {
is_rest_broker: false,
db_schema_cache: None,
},
proxy_protocol_v2: config::ProxyProtocolV2::Rejected,
handshake_timeout: Duration::from_secs(10),

View File

@@ -39,6 +39,7 @@ use crate::redis::{elasticache, notifications};
use crate::scram::threadpool::ThreadPool;
use crate::serverless::GlobalConnPoolOptions;
use crate::serverless::cancel_set::CancelSet;
use crate::serverless::rest::DbSchemaCache;
use crate::tls::client_config::compute_client_config_with_root_certs;
#[cfg(any(test, feature = "testing"))]
use crate::url::ApiUrl;
@@ -238,6 +239,10 @@ struct ProxyCliArgs {
/// if this is not local proxy, this toggles whether we accept Postgres REST requests
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
is_rest_broker: bool,
/// cache for `db_schema_cache` introspection (use `size=0` to disable)
#[clap(long, default_value = "size=4000,ttl=1h")]
db_schema_cache: String,
}
#[derive(clap::Args, Clone, Copy, Debug)]
@@ -678,8 +683,22 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
timeout: Duration::from_secs(2),
};
let db_schema_cache_config: CacheOptions = args.db_schema_cache.parse()?;
info!("Using DbSchemaCache with options={db_schema_cache_config:?}");
let db_schema_cache = if args.is_rest_broker {
Some(DbSchemaCache::new(
"db_schema_cache",
db_schema_cache_config.size,
db_schema_cache_config.ttl,
true,
))
} else {
None
};
let rest_config = RestConfig {
is_rest_broker: args.is_rest_broker,
db_schema_cache,
};
let config = ProxyConfig {

View File

@@ -15,6 +15,7 @@ use crate::serverless::GlobalConnPoolOptions;
use crate::serverless::cancel_set::CancelSet;
pub use crate::tls::server_config::{TlsConfig, configure_tls};
use crate::types::Host;
use crate::serverless::rest::DbSchemaCache;
pub struct ProxyConfig {
pub tls_config: ArcSwapOption<TlsConfig>,
@@ -74,6 +75,7 @@ pub struct AuthenticationConfig {
pub struct RestConfig {
pub is_rest_broker: bool,
pub db_schema_cache: Option<DbSchemaCache>,
}
#[derive(Debug)]

View File

@@ -12,7 +12,7 @@ mod http_util;
mod json;
mod local_conn_pool;
mod sql_over_http;
mod rest;
pub mod rest;
mod websocket;
use std::net::{IpAddr, SocketAddr};

View File

@@ -11,8 +11,8 @@ use hyper::body::Incoming;
use hyper::http::{HeaderName, HeaderValue};
use hyper::{HeaderMap, Request, Response, StatusCode};
use indexmap::IndexMap;
use serde::{Deserialize, Deserializer};
use super::http_conn_pool::{self, HttpConnPool, Send, poll_http2_client};
use serde_json::{value::RawValue, Value as JsonValue};
use tokio_util::sync::CancellationToken;
@@ -38,11 +38,13 @@ use crate::pqproto::StartupMessageParams;
use crate::proxy::NeonOptions;
use crate::serverless::backend::HttpConnError;
use crate::types::{DbName, RoleName};
use crate::cache::{Cached, TimedLru};
use crate::types::{EndpointCacheKey, EndpointId};
use subzero_core::{
api::{SingleVal, ListVal, Payload},
error::Error::{self as SubzeroCoreError, JsonDeserialize, NotFound, JwtTokenInvalid, InternalError, GucHeadersError, GucStatusError, ContentTypeError},
schema::DbSchema,
schema::{DbSchema, DbSchemaOwned},
formatter::{
Param,
Param::*,
@@ -50,6 +52,7 @@ use subzero_core::{
Snippet, SqlParam,
},
dynamic_statement::{param, sql, JoinIterator},
config::{db_schemas, db_allowed_select_functions, role_claim_key, to_tuple},
};
use subzero_core::{
api::{ContentType::*, Preferences, QueryNode::*, Representation, Resolution::*, },
@@ -91,6 +94,8 @@ static JSON_SCHEMA: &str = r#"
]
}
"#;
const INTROSPECTION_SQL: &str = include_str!("../../../../subzero/introspection/postgresql_introspection_query.sql");
const CONFIGURATION_SQL: &str = include_str!("../../../../subzero/introspection/postgresql_configuration_query.sql");
static DB_SCHEMA: std::sync::LazyLock<DbSchema> = std::sync::LazyLock::new(|| {
let mut schema = serde_json::from_str::<DbSchema>(JSON_SCHEMA)
@@ -99,12 +104,186 @@ static DB_SCHEMA: std::sync::LazyLock<DbSchema> = std::sync::LazyLock::new(|| {
schema
});
fn deserialize_comma_separated<'de, D>(deserializer: D) -> Result<Vec<String>, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
Ok(s.split(',')
.map(|s| s.trim().to_string())
.collect())
}
#[derive(Deserialize, Debug)]
pub struct ApiConfig {
#[serde(default = "db_schemas", deserialize_with = "deserialize_comma_separated")]
pub db_schemas: Vec<String>,
pub db_anon_role: Option<String>,
pub db_max_rows: Option<String>,
#[serde(default = "db_allowed_select_functions")]
pub db_allowed_select_functions: Vec<String>,
#[serde(deserialize_with = "to_tuple", default)]
pub db_pre_request: Option<(String, String)>,
#[serde(default = "role_claim_key")]
pub role_claim_key: String,
}
pub(crate) type DbSchemaCache = TimedLru<EndpointCacheKey, DbSchemaOwned>;
pub(crate) type CachedSchema = Cached<&'static DbSchemaCache, DbSchemaOwned>;
impl DbSchemaCache {
pub async fn get_remote(&self,
endpoint_id: &EndpointCacheKey,
auth_header: &HeaderValue,
connection_string: &str,
client: &mut http_conn_pool::Client<Send>,
ctx: &RequestContext,
) -> Result<(), RestError> {
let local_proxy_uri = ::http::Uri::from_static("http://proxy.local/sql");
let mut req = Request::builder().method(Method::POST).uri(local_proxy_uri);
req = req.header(&NEON_REQUEST_ID, uuid_to_header_value(ctx.session_id()));
req = req.header(&CONN_STRING, HeaderValue::from_str(connection_string).unwrap());
req = req.header(&TXN_ISOLATION_LEVEL, HeaderValue::from_str("ReadCommitted").unwrap());
req = req.header(AUTHORIZATION, auth_header);
//req = req.header(&ARRAY_MODE, HeaderValue::from_str("true").unwrap());
req = req.header(&RAW_TEXT_OUTPUT, HeaderValue::from_str("true").unwrap());
let body = json!({"query": CONFIGURATION_SQL}).to_string();
let body_boxed = Full::new(Bytes::from(body))
.map_err(|never| match never {}) // Convert Infallible to hyper::Error
.boxed();
let req = req
.body(body_boxed)
.map_err(|_| RestError::SubzeroCore(InternalError {
message: "Failed to build request".to_string()
}))?;
// send the request to the local proxy
let response = client
.inner
.inner
.send_request(req)
.await
.map_err(LocalProxyConnError::from)
.map_err(HttpConnError::from)?;
let response_status = response.status();
// Capture the response body
let response_body = response
.collect()
.await
.map_err(ReadPayloadError::from)?
.to_bytes();
//println!("response_body: {:?}", response_body);
let mut response_json: serde_json::Value = serde_json::from_slice(&response_body)
.map_err(|e| RestError::SubzeroCore(JsonDeserialize { source: e }))?;
//println!("response_json: {:?}", response_json);
let rows = response_json["rows"].as_array_mut()
.ok_or_else(|| RestError::SubzeroCore(InternalError {
message: "Missing 'rows' array in second result".to_string()
}))?;
//println!("rows: {:?}", rows);
if rows.is_empty() {
return Err(RestError::SubzeroCore(InternalError {
message: "No rows in second result".to_string()
}));
}
// Extract columns from the first (and only) row
let mut row = &mut rows[0];
let config_string = extract_string(&mut row, "config").unwrap_or_default();
println!("config_string: {:?}", config_string);
// Parse the JSON response and extract the body content efficiently
let api_config: ApiConfig = serde_json::from_str(&config_string)
.map_err(|e| RestError::SubzeroCore(JsonDeserialize { source: e }))?;
println!("api_config: {:?}", api_config);
// now that we have the api_config let's run the second INTROSPECTION_SQL query
let local_proxy_uri = ::http::Uri::from_static("http://proxy.local/sql");
let mut req = Request::builder().method(Method::POST).uri(local_proxy_uri);
req = req.header(&NEON_REQUEST_ID, uuid_to_header_value(ctx.session_id()));
req = req.header(&CONN_STRING, HeaderValue::from_str(connection_string).unwrap());
req = req.header(&TXN_ISOLATION_LEVEL, HeaderValue::from_str("ReadCommitted").unwrap());
req = req.header(AUTHORIZATION, auth_header);
//req = req.header(&ARRAY_MODE, HeaderValue::from_str("true").unwrap());
req = req.header(&RAW_TEXT_OUTPUT, HeaderValue::from_str("true").unwrap());
let body = json!({
"query": INTROSPECTION_SQL,
"params": [
api_config.db_schemas,
false
]
}).to_string();
let body_boxed = Full::new(Bytes::from(body))
.map_err(|never| match never {}) // Convert Infallible to hyper::Error
.boxed();
let req = req
.body(body_boxed)
.map_err(|_| RestError::SubzeroCore(InternalError {
message: "Failed to build request".to_string()
}))?;
// send the request to the local proxy
let response = client
.inner
.inner
.send_request(req)
.await
.map_err(LocalProxyConnError::from)
.map_err(HttpConnError::from)?;
let response_status = response.status();
let response_body = response
.collect()
.await
.map_err(ReadPayloadError::from)?
.to_bytes();
//println!("second response_body: {:?}", response_body);
let mut response_json: serde_json::Value = serde_json::from_slice(&response_body)
.map_err(|e| RestError::SubzeroCore(JsonDeserialize { source: e }))?;
//println!("response_json: {:?}", response_json);
let rows = response_json["rows"].as_array_mut()
.ok_or_else(|| RestError::SubzeroCore(InternalError {
message: "Missing 'rows' array in second result".to_string()
}))?;
//println!("rows: {:?}", rows);
if rows.is_empty() {
return Err(RestError::SubzeroCore(InternalError {
message: "No rows in second result".to_string()
}));
}
// Extract columns from the first (and only) row
let mut row = &mut rows[0];
let json_schema = extract_string(&mut row, "json_schema").unwrap_or_default();
println!("json_schema: {:?}", json_schema);
// Parse the JSON response and extract the body content efficiently
let mut schema = serde_json::from_str::<DbSchema>(&json_schema)
.map_err(|e| RestError::SubzeroCore(JsonDeserialize { source: e }))?;
schema.use_internal_permissions = false;
println!("schema!!!!!: {:?}", schema);
Ok(())
}
}
pub(super) static NEON_REQUEST_ID: HeaderName = HeaderName::from_static("neon-request-id");
static CONN_STRING: HeaderName = HeaderName::from_static("neon-connection-string");
//static RAW_TEXT_OUTPUT: HeaderName = HeaderName::from_static("neon-raw-text-output");
//static ARRAY_MODE: HeaderName = HeaderName::from_static("neon-array-mode");
static RAW_TEXT_OUTPUT: HeaderName = HeaderName::from_static("neon-raw-text-output");
static ARRAY_MODE: HeaderName = HeaderName::from_static("neon-array-mode");
static ALLOW_POOL: HeaderName = HeaderName::from_static("neon-pool-opt-in");
static TXN_ISOLATION_LEVEL: HeaderName = HeaderName::from_static("neon-batch-isolation-level");
static TXN_READ_ONLY: HeaderName = HeaderName::from_static("neon-batch-read-only");
@@ -757,11 +936,11 @@ async fn handle_inner(
);
let host = request.uri().host().unwrap_or("").split('.').next().unwrap_or("");
let endpoint_id = request.uri().host().unwrap_or("").split('.').next().unwrap_or("");
// we always use the authenticator role to connect to the database
let autheticator_role = "authenticator";
let connection_string = format!("postgresql://{}@{}.local.neon.build/database", autheticator_role, host);
let connection_string = format!("postgresql://{}@{}.local.neon.build/database", autheticator_role, endpoint_id);
let conn_info = get_conn_info(
&config.authentication_config,
@@ -779,7 +958,7 @@ async fn handle_inner(
match conn_info.auth {
AuthData::Jwt(jwt) if config.authentication_config.is_auth_broker => {
handle_rest_inner(ctx, request, &connection_string, conn_info.conn_info, jwt, backend).await
handle_rest_inner(config, ctx, request, &connection_string, conn_info.conn_info, jwt, backend).await
}
_ => {
Err(RestError::ConnInfo(ConnInfoError::MissingCredentials(Credentials::Password)))
@@ -797,6 +976,7 @@ fn extract_string(json: &mut serde_json::Value, key: &str) -> Option<String> {
}
async fn handle_rest_inner(
config: &'static ProxyConfig,
ctx: &RequestContext,
request: Request<Incoming>,
connection_string: &str,
@@ -986,7 +1166,21 @@ async fn handle_rest_inner(
let (main_statement, main_parameters, _) = generate(fmt_main_query(db_schema, api_request.schema_name, &api_request, &env).map_err(to_core_error)?);
// now we are ready to build the request to the local proxy
let endpoint_cache_key = conn_info.endpoint_cache_key().unwrap();
let mut client = backend.connect_to_local_proxy(ctx, conn_info).await?;
let _schema_ = match &config.rest_config.db_schema_cache {
Some(cache) => {
// we need the AUTH header here which we already moved from the original request
let auth_header = req.headers_ref().unwrap().get(AUTHORIZATION).unwrap();
cache.get_remote(&endpoint_cache_key, auth_header, &connection_string, &mut client, &ctx).await?
}
None => {
return Err(RestError::SubzeroCore(InternalError {
message: "DB schema cache is not configured".to_string()
}));
}
};
req = req.header(&NEON_REQUEST_ID, uuid_to_header_value(ctx.session_id()));
req = req.header(&CONN_STRING, HeaderValue::from_str(connection_string).unwrap());