Compare commits

...

40 Commits

Author SHA1 Message Date
Ruslan Talpa
ee7d8e4512 revert pg-14 submodule changes 2025-07-04 13:55:35 +03:00
Ruslan Talpa
6549708b44 change subzero dep sha 2025-07-04 13:39:49 +03:00
Ruslan Talpa
45631bf2e5 add line to remove from diff 2025-07-04 13:28:52 +03:00
Ruslan Talpa
5dbca8c756 revert changes from original hack branch 2025-07-04 13:27:59 +03:00
Ruslan Talpa
9f46ca5eb1 Merge branch 'main' into ruslan/subzero-integration 2025-07-04 13:03:55 +03:00
Ruslan Talpa
54da030a2d place the entire rest_broker code under a feature flag 2025-07-04 12:46:48 +03:00
Ruslan Talpa
afa4e48071 put subzero dependency under a feature flag 2025-07-04 11:27:36 +03:00
Ruslan Talpa
b54872a4dc fix error after merging latest master 2025-07-03 17:24:13 +03:00
Ruslan Talpa
486829f875 Merge branch 'main' into ruslan/subzero-integration 2025-07-03 17:10:43 +03:00
Ruslan Talpa
4775aa3e01 Merge branch 'main' into ruslan/subzero-integration 2025-07-01 13:46:57 +03:00
Ruslan Talpa
1785f856b6 Move the local auth backend under the "testing" feature 2025-06-30 16:52:45 +03:00
Ruslan Talpa
69b22b05da add in readme the way to run auth/rest broker locally 2025-06-30 16:17:35 +03:00
Ruslan Talpa
bf0007fa96 add note about local confir read code 2025-06-30 16:12:04 +03:00
Ruslan Talpa
a9bbe7b00b remove unused imports 2025-06-30 16:02:30 +03:00
Ruslan Talpa
7e3f64b309 implement local auth backend for proxy and remove control plane hacks 2025-06-30 16:00:43 +03:00
Ruslan Talpa
9480d17de7 fix bug in pickcurrent_chema 2025-06-30 12:53:47 +03:00
Ruslan Talpa
424004ec95 apply cargo fmt 2025-06-30 12:32:47 +03:00
Ruslan Talpa
88d1a78260 cleanup the rest path code 2025-06-30 12:30:33 +03:00
Ruslan Talpa
8e544c7f99 import introspection queries instead of loading from files 2025-06-27 14:40:02 +03:00
Ruslan Talpa
4f49fc5b79 move common error types and http realted functions to error.rs and http_util.rs 2025-06-27 13:37:29 +03:00
Ruslan Talpa
5461039c3f implement remote config fetch from the db and cache introspected schema 2025-06-27 10:20:26 +03:00
Ruslan Talpa
d6c36d103e subzero integration WIP6
beginning work on introspection of config and schema shape from the database
2025-06-26 14:42:31 +03:00
Ruslan Talpa
fbb2416685 pg 14 vendor commit changed 2025-06-26 10:25:54 +03:00
Ruslan Talpa
8072fae2fe Merge branch 'main' into ruslan/subzero-integration 2025-06-26 10:19:16 +03:00
Ruslan Talpa
3869d680f9 use a global parsed/cached schema 2025-06-26 10:14:28 +03:00
Ruslan Talpa
d3fa228d92 move subzero local test files to a "dot" folder 2025-06-25 16:43:50 +03:00
Ruslan Talpa
be6a259b85 subzero integration WIP5
cleanup and postprocess the response and set the correct headers/status and handle errors
2025-06-25 14:33:45 +03:00
Ruslan Talpa
af3ca24a5e remove unused enum values 2025-06-25 14:32:46 +03:00
Ruslan Talpa
8b44f5b479 subzero integration WIP5
extract the response body from the local proxy response
2025-06-24 17:03:42 +03:00
Ruslan Talpa
d1445cf3eb subzero integration WIP4
queries generated by subzero reach database and execute succesfully
2025-06-24 15:33:51 +03:00
Ruslan Talpa
67d3026fc4 subzero integration WIP3
* query makes it to the database
2025-06-23 11:48:55 +03:00
Ruslan Talpa
09e62e9b98 subzero integration WIP2 2025-06-23 10:11:06 +03:00
Ruslan Talpa
e121da4bfc subzero integration WIP1 2025-06-20 15:10:45 +03:00
Ruslan Talpa
4a948c9781 add note about ICU lib missing on macs and the fix 2025-06-20 10:58:38 +03:00
Ruslan Talpa
b39f04ab99 add missing parts to make disable_pg_session_jwt flag work 2025-06-20 10:23:42 +03:00
Ruslan Talpa
6bd15908fb make pg_session_jwt instalation optional with a cli flag 2025-06-20 10:17:32 +03:00
Ruslan Talpa
3e36d516c2 vanilla pg dokcer image setup 2025-06-20 09:37:39 +03:00
Conrad Ludgate
cc3af6f7dd code for local setup of auth-broker 2025-06-20 09:37:39 +03:00
Conrad Ludgate
5badc7a3fb code for local setup of auth-broker 2025-06-19 10:34:09 +01:00
Conrad Ludgate
3a73644308 use cargo-chef for compute-tools 2025-06-19 09:24:53 +01:00
11 changed files with 3262 additions and 1318 deletions

5
.gitignore vendored
View File

@@ -25,6 +25,11 @@ compaction-suite-results.*
*.o
*.so
*.Po
*.pid
# pgindent typedef lists
*.list
# various files for local testing
/proxy/.subzero
local_proxy.json

3241
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -5,8 +5,9 @@ edition = "2024"
license.workspace = true
[features]
default = []
default = ["rest_broker"]
testing = ["dep:tokio-postgres"]
rest_broker = ["subzero-core", "jsonpath_lib", "ouroboros"]
[dependencies]
ahash.workspace = true
@@ -103,6 +104,9 @@ uuid.workspace = true
x509-cert.workspace = true
redis.workspace = true
zerocopy.workspace = true
subzero-core = { git = "https://github.com/neondatabase-labs/subzero", rev = "0b3d3278f5f9ac9311a7280cb1676de80e021f06", features = ["postgresql"], optional = true }
jsonpath_lib = { version = "0.3.0", optional = true }
ouroboros = { version = "0.18", optional = true }
# jwt stuff
jose-jwa = "0.1.2"

View File

@@ -170,8 +170,8 @@ Create a configuration file called `local_proxy.json` in the root of the repo (u
Start the local proxy:
```sh
cargo run --bin local_proxy -- \
--disable_pg_session_jwt true \
cargo run --bin local_proxy --features testing -- \
--disable-pg-session-jwt \
--http 0.0.0.0:7432
```
@@ -180,6 +180,7 @@ Start the auth broker:
LOGFMT=text OTEL_SDK_DISABLED=true cargo run --bin proxy --features testing -- \
-c server.crt -k server.key \
--is-auth-broker true \
--is-rest-broker true \
--wss 0.0.0.0:8080 \
--http 0.0.0.0:7002 \
--auth-backend local
@@ -197,3 +198,9 @@ curl -k "https://foo.local.neon.build:8080/sql" \
-H "neon-connection-string: postgresql://authenticator@foo.local.neon.build/database" \
-d '{"query":"select 1","params":[]}'
```
Make a rest request against the auth broker (rest broker):
```sh
curl -k "https://foo.local.neon.build:8080/database/rest/v1/items?select=id,name&id=eq.1" \
-H "Authorization: Bearer $NEON_JWT"
```

View File

@@ -20,6 +20,9 @@ use crate::auth::backend::jwt::JwkCache;
use crate::auth::backend::local::LocalBackend;
use crate::auth::{self};
use crate::cancellation::CancellationHandler;
#[cfg(feature = "rest_broker")]
use crate::config::RestConfig;
use crate::config::{
self, AuthenticationConfig, ComputeConfig, HttpConfig, ProxyConfig, RetryConfig,
refresh_config_loop,
@@ -276,6 +279,11 @@ fn build_config(args: &LocalProxyCliArgs) -> anyhow::Result<&'static ProxyConfig
accept_jwts: true,
console_redirect_confirmation_timeout: Duration::ZERO,
},
#[cfg(feature = "rest_broker")]
rest_config: RestConfig {
is_rest_broker: false,
db_schema_cache: None,
},
proxy_protocol_v2: config::ProxyProtocolV2::Rejected,
handshake_timeout: Duration::from_secs(10),
wake_compute_retry_config: RetryConfig::parse(RetryConfig::WAKE_COMPUTE_DEFAULT_VALUES)?,

View File

@@ -31,6 +31,8 @@ use crate::auth::backend::local::LocalBackend;
use crate::auth::backend::{ConsoleRedirectBackend, MaybeOwned};
use crate::batch::BatchQueue;
use crate::cancellation::{CancellationHandler, CancellationProcessor};
#[cfg(feature = "rest_broker")]
use crate::config::RestConfig;
#[cfg(any(test, feature = "testing"))]
use crate::config::refresh_config_loop;
use crate::config::{
@@ -47,6 +49,8 @@ use crate::redis::{elasticache, notifications};
use crate::scram::threadpool::ThreadPool;
use crate::serverless::GlobalConnPoolOptions;
use crate::serverless::cancel_set::CancelSet;
#[cfg(feature = "rest_broker")]
use crate::serverless::rest::DbSchemaCache;
use crate::tls::client_config::compute_client_config_with_root_certs;
#[cfg(any(test, feature = "testing"))]
use crate::url::ApiUrl;
@@ -244,10 +248,12 @@ struct ProxyCliArgs {
/// if this is not local proxy, this toggles whether we accept Postgres REST requests
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
#[cfg(feature = "rest_broker")]
is_rest_broker: bool,
/// cache for `db_schema_cache` introspection (use `size=0` to disable)
#[clap(long, default_value = "size=1000,ttl=1h")]
#[cfg(feature = "rest_broker")]
db_schema_cache: String,
}
@@ -515,6 +521,17 @@ pub async fn run() -> anyhow::Result<()> {
));
maintenance_tasks.spawn(control_plane::mgmt::task_main(mgmt_listener));
// add a task to flush the db_schema cache every 10 minutes
#[cfg(feature = "rest_broker")]
if let Some(db_schema_cache) = &config.rest_config.db_schema_cache {
maintenance_tasks.spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(600)).await;
db_schema_cache.flush();
}
});
}
if let Some(metrics_config) = &config.metric_collection {
// TODO: Add gc regardles of the metric collection being enabled.
maintenance_tasks.spawn(usage_metrics::task_main(metrics_config));
@@ -682,6 +699,28 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
timeout: Duration::from_secs(2),
};
#[cfg(feature = "rest_broker")]
let rest_config = {
let db_schema_cache_config: CacheOptions = args.db_schema_cache.parse()?;
info!("Using DbSchemaCache with options={db_schema_cache_config:?}");
let db_schema_cache = if args.is_rest_broker {
Some(DbSchemaCache::new(
"db_schema_cache",
db_schema_cache_config.size,
db_schema_cache_config.ttl,
true,
))
} else {
None
};
RestConfig {
is_rest_broker: args.is_rest_broker,
db_schema_cache,
}
};
let config = ProxyConfig {
tls_config,
metric_collection,
@@ -694,6 +733,8 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
connect_to_compute: compute_config,
#[cfg(feature = "testing")]
disable_pg_session_jwt: false,
#[cfg(feature = "rest_broker")]
rest_config,
};
let config = Box::leak(Box::new(config));

View File

@@ -204,6 +204,10 @@ impl<K: Hash + Eq + Clone, V: Clone> TimedLru<K, V> {
self.insert_raw_ttl(key, value, ttl, false);
}
pub(crate) fn insert(&self, key: K, value: V) {
self.insert_raw_ttl(key, value, self.ttl, self.update_ttl_on_retrieval);
}
pub(crate) fn insert_unit(&self, key: K, value: V) -> (Option<V>, Cached<&Self, ()>) {
let (_, old) = self.insert_raw(key.clone(), value);
@@ -214,6 +218,28 @@ impl<K: Hash + Eq + Clone, V: Clone> TimedLru<K, V> {
(old, cached)
}
pub(crate) fn flush(&self) {
let now = Instant::now();
let mut cache = self.cache.lock();
// Collect keys of expired entries first
let expired_keys: Vec<_> = cache
.iter()
.filter_map(|(key, entry)| {
if entry.expires_at <= now {
Some(key.clone())
} else {
None
}
})
.collect();
// Remove expired entries
for key in expired_keys {
cache.remove(&key);
}
}
}
impl<K: Hash + Eq, V: Clone> TimedLru<K, V> {

View File

@@ -22,6 +22,8 @@ use crate::rate_limiter::{RateBucketInfo, RateLimitAlgorithm, RateLimiterConfig}
use crate::scram::threadpool::ThreadPool;
use crate::serverless::GlobalConnPoolOptions;
use crate::serverless::cancel_set::CancelSet;
#[cfg(feature = "rest_broker")]
use crate::serverless::rest::DbSchemaCache;
pub use crate::tls::server_config::{TlsConfig, configure_tls};
use crate::types::{Host, RoleName};
@@ -30,6 +32,8 @@ pub struct ProxyConfig {
pub metric_collection: Option<MetricCollectionConfig>,
pub http_config: HttpConfig,
pub authentication_config: AuthenticationConfig,
#[cfg(feature = "rest_broker")]
pub rest_config: RestConfig,
pub proxy_protocol_v2: ProxyProtocolV2,
pub handshake_timeout: Duration,
pub wake_compute_retry_config: RetryConfig,
@@ -80,6 +84,12 @@ pub struct AuthenticationConfig {
pub console_redirect_confirmation_timeout: tokio::time::Duration,
}
#[cfg(feature = "rest_broker")]
pub struct RestConfig {
pub is_rest_broker: bool,
pub db_schema_cache: Option<DbSchemaCache>,
}
#[derive(Debug)]
pub struct EndpointCacheConfig {
/// Batch size to receive all endpoints on the startup.

View File

@@ -8,7 +8,7 @@ use std::time::Duration;
use clashmap::ClashMap;
use tokio::time::Instant;
use tracing::{debug, info};
use tracing::debug;
use super::{EndpointAccessControl, RoleAccessControl};
use crate::auth::backend::ComputeUserInfo;
@@ -234,7 +234,8 @@ impl<K: Hash + Eq + Clone> ApiLocks<K> {
// temporary lock a single shard and then clear any semaphores that aren't currently checked out
// race conditions: if strong_count == 1, there's no way that it can increase while the shard is locked
// therefore releasing it is safe from race conditions
info!(
debug!(
//FIXME: is anything depending on this being info?
name = self.name,
shard = i,
"performing epoch reclamation on api lock"

View File

@@ -11,6 +11,8 @@ mod http_conn_pool;
mod http_util;
mod json;
mod local_conn_pool;
#[cfg(feature = "rest_broker")]
pub mod rest;
mod sql_over_http;
mod websocket;
@@ -487,6 +489,37 @@ async fn request_handler(
.body(Empty::new().map_err(|x| match x {}).boxed())
.map_err(|e| ApiError::InternalServerError(e.into()))
} else {
json_response(StatusCode::BAD_REQUEST, "query is not supported")
#[cfg(feature = "rest_broker")]
{
if config.rest_config.is_rest_broker && {
let path_parts: Vec<&str> = request.uri().path().split('/').collect();
path_parts.len() >= 3 && path_parts[2].starts_with("rest")
} {
let ctx =
RequestContext::new(session_id, conn_info, crate::metrics::Protocol::Http);
let span = ctx.span();
let testodrome_id = request
.headers()
.get("X-Neon-Query-ID")
.and_then(|value| value.to_str().ok())
.map(|s| s.to_string());
if let Some(query_id) = testodrome_id {
info!(parent: &ctx.span(), "testodrome query ID: {query_id}");
ctx.set_testodrome_id(query_id.into());
}
rest::handle(config, ctx, request, backend, http_cancellation_token)
.instrument(span)
.await
} else {
json_response(StatusCode::BAD_REQUEST, "query is not supported")
}
}
#[cfg(not(feature = "rest_broker"))]
{
json_response(StatusCode::BAD_REQUEST, "query is not supported")
}
}
}

1192
proxy/src/serverless/rest.rs Normal file

File diff suppressed because it is too large Load Diff