mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 22:10:39 +00:00
Compare commits
40 Commits
release-co
...
ruslan/sub
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ee7d8e4512 | ||
|
|
6549708b44 | ||
|
|
45631bf2e5 | ||
|
|
5dbca8c756 | ||
|
|
9f46ca5eb1 | ||
|
|
54da030a2d | ||
|
|
afa4e48071 | ||
|
|
b54872a4dc | ||
|
|
486829f875 | ||
|
|
4775aa3e01 | ||
|
|
1785f856b6 | ||
|
|
69b22b05da | ||
|
|
bf0007fa96 | ||
|
|
a9bbe7b00b | ||
|
|
7e3f64b309 | ||
|
|
9480d17de7 | ||
|
|
424004ec95 | ||
|
|
88d1a78260 | ||
|
|
8e544c7f99 | ||
|
|
4f49fc5b79 | ||
|
|
5461039c3f | ||
|
|
d6c36d103e | ||
|
|
fbb2416685 | ||
|
|
8072fae2fe | ||
|
|
3869d680f9 | ||
|
|
d3fa228d92 | ||
|
|
be6a259b85 | ||
|
|
af3ca24a5e | ||
|
|
8b44f5b479 | ||
|
|
d1445cf3eb | ||
|
|
67d3026fc4 | ||
|
|
09e62e9b98 | ||
|
|
e121da4bfc | ||
|
|
4a948c9781 | ||
|
|
b39f04ab99 | ||
|
|
6bd15908fb | ||
|
|
3e36d516c2 | ||
|
|
cc3af6f7dd | ||
|
|
5badc7a3fb | ||
|
|
3a73644308 |
5
.gitignore
vendored
5
.gitignore
vendored
@@ -25,6 +25,11 @@ compaction-suite-results.*
|
||||
*.o
|
||||
*.so
|
||||
*.Po
|
||||
*.pid
|
||||
|
||||
# pgindent typedef lists
|
||||
*.list
|
||||
|
||||
# various files for local testing
|
||||
/proxy/.subzero
|
||||
local_proxy.json
|
||||
|
||||
3241
Cargo.lock
generated
3241
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -5,8 +5,9 @@ edition = "2024"
|
||||
license.workspace = true
|
||||
|
||||
[features]
|
||||
default = []
|
||||
default = ["rest_broker"]
|
||||
testing = ["dep:tokio-postgres"]
|
||||
rest_broker = ["subzero-core", "jsonpath_lib", "ouroboros"]
|
||||
|
||||
[dependencies]
|
||||
ahash.workspace = true
|
||||
@@ -103,6 +104,9 @@ uuid.workspace = true
|
||||
x509-cert.workspace = true
|
||||
redis.workspace = true
|
||||
zerocopy.workspace = true
|
||||
subzero-core = { git = "https://github.com/neondatabase-labs/subzero", rev = "0b3d3278f5f9ac9311a7280cb1676de80e021f06", features = ["postgresql"], optional = true }
|
||||
jsonpath_lib = { version = "0.3.0", optional = true }
|
||||
ouroboros = { version = "0.18", optional = true }
|
||||
|
||||
# jwt stuff
|
||||
jose-jwa = "0.1.2"
|
||||
|
||||
@@ -170,8 +170,8 @@ Create a configuration file called `local_proxy.json` in the root of the repo (u
|
||||
|
||||
Start the local proxy:
|
||||
```sh
|
||||
cargo run --bin local_proxy -- \
|
||||
--disable_pg_session_jwt true \
|
||||
cargo run --bin local_proxy --features testing -- \
|
||||
--disable-pg-session-jwt \
|
||||
--http 0.0.0.0:7432
|
||||
```
|
||||
|
||||
@@ -180,6 +180,7 @@ Start the auth broker:
|
||||
LOGFMT=text OTEL_SDK_DISABLED=true cargo run --bin proxy --features testing -- \
|
||||
-c server.crt -k server.key \
|
||||
--is-auth-broker true \
|
||||
--is-rest-broker true \
|
||||
--wss 0.0.0.0:8080 \
|
||||
--http 0.0.0.0:7002 \
|
||||
--auth-backend local
|
||||
@@ -197,3 +198,9 @@ curl -k "https://foo.local.neon.build:8080/sql" \
|
||||
-H "neon-connection-string: postgresql://authenticator@foo.local.neon.build/database" \
|
||||
-d '{"query":"select 1","params":[]}'
|
||||
```
|
||||
|
||||
Make a rest request against the auth broker (rest broker):
|
||||
```sh
|
||||
curl -k "https://foo.local.neon.build:8080/database/rest/v1/items?select=id,name&id=eq.1" \
|
||||
-H "Authorization: Bearer $NEON_JWT"
|
||||
```
|
||||
|
||||
@@ -20,6 +20,9 @@ use crate::auth::backend::jwt::JwkCache;
|
||||
use crate::auth::backend::local::LocalBackend;
|
||||
use crate::auth::{self};
|
||||
use crate::cancellation::CancellationHandler;
|
||||
|
||||
#[cfg(feature = "rest_broker")]
|
||||
use crate::config::RestConfig;
|
||||
use crate::config::{
|
||||
self, AuthenticationConfig, ComputeConfig, HttpConfig, ProxyConfig, RetryConfig,
|
||||
refresh_config_loop,
|
||||
@@ -276,6 +279,11 @@ fn build_config(args: &LocalProxyCliArgs) -> anyhow::Result<&'static ProxyConfig
|
||||
accept_jwts: true,
|
||||
console_redirect_confirmation_timeout: Duration::ZERO,
|
||||
},
|
||||
#[cfg(feature = "rest_broker")]
|
||||
rest_config: RestConfig {
|
||||
is_rest_broker: false,
|
||||
db_schema_cache: None,
|
||||
},
|
||||
proxy_protocol_v2: config::ProxyProtocolV2::Rejected,
|
||||
handshake_timeout: Duration::from_secs(10),
|
||||
wake_compute_retry_config: RetryConfig::parse(RetryConfig::WAKE_COMPUTE_DEFAULT_VALUES)?,
|
||||
|
||||
@@ -31,6 +31,8 @@ use crate::auth::backend::local::LocalBackend;
|
||||
use crate::auth::backend::{ConsoleRedirectBackend, MaybeOwned};
|
||||
use crate::batch::BatchQueue;
|
||||
use crate::cancellation::{CancellationHandler, CancellationProcessor};
|
||||
#[cfg(feature = "rest_broker")]
|
||||
use crate::config::RestConfig;
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
use crate::config::refresh_config_loop;
|
||||
use crate::config::{
|
||||
@@ -47,6 +49,8 @@ use crate::redis::{elasticache, notifications};
|
||||
use crate::scram::threadpool::ThreadPool;
|
||||
use crate::serverless::GlobalConnPoolOptions;
|
||||
use crate::serverless::cancel_set::CancelSet;
|
||||
#[cfg(feature = "rest_broker")]
|
||||
use crate::serverless::rest::DbSchemaCache;
|
||||
use crate::tls::client_config::compute_client_config_with_root_certs;
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
use crate::url::ApiUrl;
|
||||
@@ -244,10 +248,12 @@ struct ProxyCliArgs {
|
||||
|
||||
/// if this is not local proxy, this toggles whether we accept Postgres REST requests
|
||||
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
|
||||
#[cfg(feature = "rest_broker")]
|
||||
is_rest_broker: bool,
|
||||
|
||||
/// cache for `db_schema_cache` introspection (use `size=0` to disable)
|
||||
#[clap(long, default_value = "size=1000,ttl=1h")]
|
||||
#[cfg(feature = "rest_broker")]
|
||||
db_schema_cache: String,
|
||||
}
|
||||
|
||||
@@ -515,6 +521,17 @@ pub async fn run() -> anyhow::Result<()> {
|
||||
));
|
||||
maintenance_tasks.spawn(control_plane::mgmt::task_main(mgmt_listener));
|
||||
|
||||
// add a task to flush the db_schema cache every 10 minutes
|
||||
#[cfg(feature = "rest_broker")]
|
||||
if let Some(db_schema_cache) = &config.rest_config.db_schema_cache {
|
||||
maintenance_tasks.spawn(async move {
|
||||
loop {
|
||||
tokio::time::sleep(Duration::from_secs(600)).await;
|
||||
db_schema_cache.flush();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if let Some(metrics_config) = &config.metric_collection {
|
||||
// TODO: Add gc regardles of the metric collection being enabled.
|
||||
maintenance_tasks.spawn(usage_metrics::task_main(metrics_config));
|
||||
@@ -682,6 +699,28 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
timeout: Duration::from_secs(2),
|
||||
};
|
||||
|
||||
#[cfg(feature = "rest_broker")]
|
||||
let rest_config = {
|
||||
let db_schema_cache_config: CacheOptions = args.db_schema_cache.parse()?;
|
||||
info!("Using DbSchemaCache with options={db_schema_cache_config:?}");
|
||||
|
||||
let db_schema_cache = if args.is_rest_broker {
|
||||
Some(DbSchemaCache::new(
|
||||
"db_schema_cache",
|
||||
db_schema_cache_config.size,
|
||||
db_schema_cache_config.ttl,
|
||||
true,
|
||||
))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
RestConfig {
|
||||
is_rest_broker: args.is_rest_broker,
|
||||
db_schema_cache,
|
||||
}
|
||||
};
|
||||
|
||||
let config = ProxyConfig {
|
||||
tls_config,
|
||||
metric_collection,
|
||||
@@ -694,6 +733,8 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
connect_to_compute: compute_config,
|
||||
#[cfg(feature = "testing")]
|
||||
disable_pg_session_jwt: false,
|
||||
#[cfg(feature = "rest_broker")]
|
||||
rest_config,
|
||||
};
|
||||
|
||||
let config = Box::leak(Box::new(config));
|
||||
|
||||
26
proxy/src/cache/timed_lru.rs
vendored
26
proxy/src/cache/timed_lru.rs
vendored
@@ -204,6 +204,10 @@ impl<K: Hash + Eq + Clone, V: Clone> TimedLru<K, V> {
|
||||
self.insert_raw_ttl(key, value, ttl, false);
|
||||
}
|
||||
|
||||
pub(crate) fn insert(&self, key: K, value: V) {
|
||||
self.insert_raw_ttl(key, value, self.ttl, self.update_ttl_on_retrieval);
|
||||
}
|
||||
|
||||
pub(crate) fn insert_unit(&self, key: K, value: V) -> (Option<V>, Cached<&Self, ()>) {
|
||||
let (_, old) = self.insert_raw(key.clone(), value);
|
||||
|
||||
@@ -214,6 +218,28 @@ impl<K: Hash + Eq + Clone, V: Clone> TimedLru<K, V> {
|
||||
|
||||
(old, cached)
|
||||
}
|
||||
|
||||
pub(crate) fn flush(&self) {
|
||||
let now = Instant::now();
|
||||
let mut cache = self.cache.lock();
|
||||
|
||||
// Collect keys of expired entries first
|
||||
let expired_keys: Vec<_> = cache
|
||||
.iter()
|
||||
.filter_map(|(key, entry)| {
|
||||
if entry.expires_at <= now {
|
||||
Some(key.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
// Remove expired entries
|
||||
for key in expired_keys {
|
||||
cache.remove(&key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<K: Hash + Eq, V: Clone> TimedLru<K, V> {
|
||||
|
||||
@@ -22,6 +22,8 @@ use crate::rate_limiter::{RateBucketInfo, RateLimitAlgorithm, RateLimiterConfig}
|
||||
use crate::scram::threadpool::ThreadPool;
|
||||
use crate::serverless::GlobalConnPoolOptions;
|
||||
use crate::serverless::cancel_set::CancelSet;
|
||||
#[cfg(feature = "rest_broker")]
|
||||
use crate::serverless::rest::DbSchemaCache;
|
||||
pub use crate::tls::server_config::{TlsConfig, configure_tls};
|
||||
use crate::types::{Host, RoleName};
|
||||
|
||||
@@ -30,6 +32,8 @@ pub struct ProxyConfig {
|
||||
pub metric_collection: Option<MetricCollectionConfig>,
|
||||
pub http_config: HttpConfig,
|
||||
pub authentication_config: AuthenticationConfig,
|
||||
#[cfg(feature = "rest_broker")]
|
||||
pub rest_config: RestConfig,
|
||||
pub proxy_protocol_v2: ProxyProtocolV2,
|
||||
pub handshake_timeout: Duration,
|
||||
pub wake_compute_retry_config: RetryConfig,
|
||||
@@ -80,6 +84,12 @@ pub struct AuthenticationConfig {
|
||||
pub console_redirect_confirmation_timeout: tokio::time::Duration,
|
||||
}
|
||||
|
||||
#[cfg(feature = "rest_broker")]
|
||||
pub struct RestConfig {
|
||||
pub is_rest_broker: bool,
|
||||
pub db_schema_cache: Option<DbSchemaCache>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct EndpointCacheConfig {
|
||||
/// Batch size to receive all endpoints on the startup.
|
||||
|
||||
@@ -8,7 +8,7 @@ use std::time::Duration;
|
||||
|
||||
use clashmap::ClashMap;
|
||||
use tokio::time::Instant;
|
||||
use tracing::{debug, info};
|
||||
use tracing::debug;
|
||||
|
||||
use super::{EndpointAccessControl, RoleAccessControl};
|
||||
use crate::auth::backend::ComputeUserInfo;
|
||||
@@ -234,7 +234,8 @@ impl<K: Hash + Eq + Clone> ApiLocks<K> {
|
||||
// temporary lock a single shard and then clear any semaphores that aren't currently checked out
|
||||
// race conditions: if strong_count == 1, there's no way that it can increase while the shard is locked
|
||||
// therefore releasing it is safe from race conditions
|
||||
info!(
|
||||
debug!(
|
||||
//FIXME: is anything depending on this being info?
|
||||
name = self.name,
|
||||
shard = i,
|
||||
"performing epoch reclamation on api lock"
|
||||
|
||||
@@ -11,6 +11,8 @@ mod http_conn_pool;
|
||||
mod http_util;
|
||||
mod json;
|
||||
mod local_conn_pool;
|
||||
#[cfg(feature = "rest_broker")]
|
||||
pub mod rest;
|
||||
mod sql_over_http;
|
||||
mod websocket;
|
||||
|
||||
@@ -487,6 +489,37 @@ async fn request_handler(
|
||||
.body(Empty::new().map_err(|x| match x {}).boxed())
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))
|
||||
} else {
|
||||
json_response(StatusCode::BAD_REQUEST, "query is not supported")
|
||||
#[cfg(feature = "rest_broker")]
|
||||
{
|
||||
if config.rest_config.is_rest_broker && {
|
||||
let path_parts: Vec<&str> = request.uri().path().split('/').collect();
|
||||
path_parts.len() >= 3 && path_parts[2].starts_with("rest")
|
||||
} {
|
||||
let ctx =
|
||||
RequestContext::new(session_id, conn_info, crate::metrics::Protocol::Http);
|
||||
let span = ctx.span();
|
||||
|
||||
let testodrome_id = request
|
||||
.headers()
|
||||
.get("X-Neon-Query-ID")
|
||||
.and_then(|value| value.to_str().ok())
|
||||
.map(|s| s.to_string());
|
||||
|
||||
if let Some(query_id) = testodrome_id {
|
||||
info!(parent: &ctx.span(), "testodrome query ID: {query_id}");
|
||||
ctx.set_testodrome_id(query_id.into());
|
||||
}
|
||||
|
||||
rest::handle(config, ctx, request, backend, http_cancellation_token)
|
||||
.instrument(span)
|
||||
.await
|
||||
} else {
|
||||
json_response(StatusCode::BAD_REQUEST, "query is not supported")
|
||||
}
|
||||
}
|
||||
#[cfg(not(feature = "rest_broker"))]
|
||||
{
|
||||
json_response(StatusCode::BAD_REQUEST, "query is not supported")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
1192
proxy/src/serverless/rest.rs
Normal file
1192
proxy/src/serverless/rest.rs
Normal file
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user