Compare commits

...

28 Commits

Author SHA1 Message Date
Conrad Ludgate
45232ca632 Merge remote-tracking branch 'origin/local-proxy-lazy-ext-install' into test-local-proxy-jwt-ext-install 2024-10-17 12:39:22 +01:00
Conrad Ludgate
e3385a6436 Merge branch 'main' into local-proxy-lazy-ext-install 2024-10-17 12:38:51 +01:00
Conrad Ludgate
7e7976b5cf Merge remote-tracking branch 'origin/grants-endpoint' into test-local-proxy-jwt-ext-install 2024-10-17 11:52:56 +01:00
Conrad Ludgate
eda964f7c4 do not quote privileges 2024-10-17 11:50:12 +01:00
Conrad Ludgate
815d499b14 update version 2024-10-17 10:16:45 +01:00
Conrad Ludgate
8a7ed8ce72 Merge remote-tracking branch 'origin/grants-endpoint' into test-local-proxy-jwt-ext-install 2024-10-17 10:15:37 +01:00
Conrad Ludgate
3f3cb744f0 Merge remote-tracking branch 'origin/install-extensions-endpoint' into test-local-proxy-jwt-ext-install 2024-10-17 10:13:28 +01:00
Conrad Ludgate
d0166ee736 Merge remote-tracking branch 'origin/local-proxy-lazy-ext-install' into test-local-proxy-jwt-ext-install 2024-10-17 10:13:19 +01:00
Conrad Ludgate
284c84da5c refactor 2024-10-17 10:11:58 +01:00
Conrad Ludgate
94204dbbae [local_proxy]: install pg_session_jwt extension on demand 2024-10-17 10:10:24 +01:00
Conrad Ludgate
6de90c7ce4 quote identifiers 2024-10-16 12:09:25 +01:00
Conrad Ludgate
59d197f6f6 use pg_quote 2024-10-16 12:05:27 +01:00
Conrad Ludgate
028f5291f8 call out to quote_ident 2024-10-16 11:55:37 +01:00
Jere Vaara
f105dc9397 Make install extensions fn async 2024-10-16 13:32:11 +03:00
Jere Vaara
603ca192a7 Change path as per Alexey's feedback 2024-10-16 13:32:11 +03:00
Jere Vaara
36bffe5ff3 Install required version or update 2024-10-16 13:32:11 +03:00
Jere Vaara
133ea2f31f Require extension version parameter 2024-10-16 13:32:11 +03:00
Jere Vaara
43400dfdea change path 2024-10-16 13:32:11 +03:00
Jere Vaara
2b5d79f625 Respond with extension name and version 2024-10-16 13:32:11 +03:00
Jere Vaara
a766ac3dc7 Add error response status code 2024-10-16 13:32:11 +03:00
Jere Vaara
04f5807b99 Add endpoint that allows extensions to be installed 2024-10-16 13:32:11 +03:00
Jere Vaara
b57f2c60f2 Make set role grants fn async 2024-10-16 13:25:19 +03:00
Jere Vaara
d0ca79aeb3 Use serde to serialize/deserialize Privilege instead of manual 2024-10-15 17:18:59 +03:00
Jere Vaara
39e0e31605 Move privilege to compute api 2024-10-15 16:21:52 +03:00
Jere Vaara
e495b99abe Add to_string for privilege 2024-10-15 16:18:06 +03:00
Jere Vaara
3f92477af0 Use string formatted query instead 2024-10-15 16:18:06 +03:00
Jere Vaara
c6c29f86da Handle privileges more strictly 2024-10-15 16:18:06 +03:00
Jere Vaara
4e15a68ffd Add endpoint to set role grants 2024-10-15 16:18:06 +03:00
15 changed files with 617 additions and 43 deletions

View File

@@ -978,8 +978,8 @@ ARG PG_VERSION
RUN case "${PG_VERSION}" in "v17") \
echo "pg_session_jwt does not yet have a release that supports pg17" && exit 0;; \
esac && \
wget https://github.com/neondatabase/pg_session_jwt/archive/5aee2625af38213650e1a07ae038fdc427250ee4.tar.gz -O pg_session_jwt.tar.gz && \
echo "5d91b10bc1347d36cffc456cb87bec25047935d6503dc652ca046f04760828e7 pg_session_jwt.tar.gz" | sha256sum --check && \
wget https://github.com/neondatabase/pg_session_jwt/archive/1c79c014c4c225c8684dc24a88369e79b4dbe762.tar.gz -O pg_session_jwt.tar.gz && \
echo "bc04b25626a88580b6fed1b87f45ba0a7ca66dbac003a3ec378a1a21b1456d8b pg_session_jwt.tar.gz" | sha256sum --check && \
mkdir pg_session_jwt-src && cd pg_session_jwt-src && tar xzf ../pg_session_jwt.tar.gz --strip-components=1 -C . && \
sed -i 's/pgrx = "=0.11.3"/pgrx = { version = "=0.11.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
cargo pgrx install --release

View File

@@ -25,6 +25,7 @@ use tracing::{debug, error, info, instrument, warn};
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use compute_api::privilege::Privilege;
use compute_api::responses::{ComputeMetrics, ComputeStatus};
use compute_api::spec::{ComputeFeature, ComputeMode, ComputeSpec};
use utils::measured_stream::MeasuredReader;
@@ -1367,6 +1368,96 @@ LIMIT 100",
download_size
}
pub async fn install_extension(
&self,
ext_name: &str,
db_name: &str,
ext_version: &str,
) -> Result<String> {
use tokio_postgres::config::Config;
use tokio_postgres::NoTls;
let mut conf = Config::from_str(self.connstr.as_str()).unwrap();
conf.dbname(db_name);
let (db_client, conn) = conf
.connect(NoTls)
.await
.context("Failed to connect to the database")?;
tokio::spawn(conn);
let version_query = "SELECT extversion FROM pg_extension WHERE extname = $1";
let version: Option<String> = db_client
.query_opt(version_query, &[&ext_name])
.await
.with_context(|| format!("Failed to execute query: {}", version_query))?
.map(|row| row.get(0));
// sanitize the inputs as postgres idents.
let ext_name: String = ext_name.to_string().pg_quote();
let ext_version: String = ext_version.to_string().pg_quote();
if let Some(installed_version) = version {
if installed_version == ext_version {
return Ok(installed_version);
}
let query = format!("ALTER EXTENSION {ext_name} UPDATE TO {ext_version}");
db_client
.simple_query(&query)
.await
.context(format!("Failed to execute query: {}", query))?;
} else {
let query =
format!("CREATE EXTENSION IF NOT EXISTS {ext_name} WITH VERSION {ext_version}");
db_client
.simple_query(&query)
.await
.context(format!("Failed to execute query: {}", query))?;
}
Ok(ext_version.to_string())
}
pub async fn set_role_grants(
&self,
db_name: &str,
schema_name: &str,
privileges: &[Privilege],
role_name: &str,
) -> Result<()> {
use tokio_postgres::config::Config;
use tokio_postgres::NoTls;
let mut conf = Config::from_str(self.connstr.as_str()).unwrap();
conf.dbname(db_name);
let (db_client, conn) = conf
.connect(NoTls)
.await
.context("Failed to connect to the database")?;
tokio::spawn(conn);
let query = format!(
"GRANT {} ON SCHEMA {} TO {}",
privileges
.iter()
// should not be quoted as it's part of the command.
// is already sanitized so it's ok
.map(|p| p.as_str())
.collect::<Vec<&'static str>>()
.join(", "),
// quote the schema and role name as identifiers to sanitize them.
schema_name.to_string().pg_quote(),
role_name.to_string().pg_quote(),
);
db_client
.simple_query(&query)
.await
.context(format!("Failed to execute query: {}", query))?;
Ok(())
}
#[tokio::main]
pub async fn prepare_preload_libraries(
&self,

View File

@@ -9,8 +9,13 @@ use crate::catalog::SchemaDumpError;
use crate::catalog::{get_database_schema, get_dbs_and_roles};
use crate::compute::forward_termination_signal;
use crate::compute::{ComputeNode, ComputeState, ParsedSpec};
use compute_api::requests::ConfigurationRequest;
use compute_api::responses::{ComputeStatus, ComputeStatusResponse, GenericAPIError};
use compute_api::requests::{
ExtensionInstallRequest, {ConfigurationRequest, SetRoleGrantsRequest},
};
use compute_api::responses::{
ComputeStatus, ComputeStatusResponse, ExtensionInstallResult, GenericAPIError,
SetRoleGrantsResponse,
};
use anyhow::Result;
use hyper::header::CONTENT_TYPE;
@@ -98,6 +103,38 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
}
}
(&Method::POST, "/extensions") => {
info!("serving /extensions POST request");
let status = compute.get_status();
if status != ComputeStatus::Running {
let msg = format!(
"invalid compute status for extensions request: {:?}",
status
);
error!(msg);
return Response::new(Body::from(msg));
}
let request = hyper::body::to_bytes(req.into_body()).await.unwrap();
let request = serde_json::from_slice::<ExtensionInstallRequest>(&request).unwrap();
let res = compute
.install_extension(&request.extension, &request.database, &request.version)
.await;
match res {
Ok(version) => render_json(Body::from(
serde_json::to_string(&ExtensionInstallResult {
extension: request.extension,
version,
})
.unwrap(),
)),
Err(e) => {
error!("install_extension failed: {}", e);
render_json_error(&e.to_string(), StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
(&Method::GET, "/info") => {
let num_cpus = num_cpus::get_physical();
info!("serving /info GET request. num_cpus: {}", num_cpus);
@@ -165,6 +202,46 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
}
}
(&Method::POST, "/grants") => {
info!("serving /grants POST request");
let status = compute.get_status();
if status != ComputeStatus::Running {
let msg = format!(
"invalid compute status for set_role_grants request: {:?}",
status
);
error!(msg);
return Response::new(Body::from(msg));
}
let request = hyper::body::to_bytes(req.into_body()).await.unwrap();
let request = serde_json::from_slice::<SetRoleGrantsRequest>(&request).unwrap();
let res = compute
.set_role_grants(
&request.database,
&request.schema,
&request.privileges,
&request.role,
)
.await;
match res {
Ok(()) => render_json(Body::from(
serde_json::to_string(&SetRoleGrantsResponse {
database: request.database,
schema: request.schema,
role: request.role,
privileges: request.privileges,
})
.unwrap(),
)),
Err(e) => {
error!("set_role_grants failed: {}", e);
Response::new(Body::from(e.to_string()))
}
}
}
// get the list of installed extensions
// currently only used in python tests
// TODO: call it from cplane

View File

@@ -10,7 +10,7 @@ paths:
/status:
get:
tags:
- Info
- Info
summary: Get compute node internal status.
description: ""
operationId: getComputeStatus
@@ -25,7 +25,7 @@ paths:
/metrics.json:
get:
tags:
- Info
- Info
summary: Get compute node startup metrics in JSON format.
description: ""
operationId: getComputeMetricsJSON
@@ -40,7 +40,7 @@ paths:
/insights:
get:
tags:
- Info
- Info
summary: Get current compute insights in JSON format.
description: |
Note, that this doesn't include any historical data.
@@ -56,7 +56,7 @@ paths:
/installed_extensions:
get:
tags:
- Info
- Info
summary: Get installed extensions.
description: ""
operationId: getInstalledExtensions
@@ -70,7 +70,7 @@ paths:
/info:
get:
tags:
- Info
- Info
summary: Get info about the compute pod / VM.
description: ""
operationId: getInfo
@@ -127,10 +127,38 @@ paths:
schema:
$ref: "#/components/schemas/GenericError"
/grants:
post:
tags:
- Grants
summary: Apply grants to the database.
description: ""
operationId: setRoleGrants
requestBody:
description: Grants request.
required: true
content:
application/json:
schema:
$ref: SetRoleGrantsRequest
responses:
200:
description: Grants applied.
content:
application/json:
schema:
$ref: "#/components/schemas/SetRoleGrantsResponse"
500:
description: Error occurred during grants application.
content:
application/json:
schema:
$ref: "#/components/schemas/GenericError"
/check_writability:
post:
tags:
- Check
- Check
summary: Check that we can write new data on this compute.
description: ""
operationId: checkComputeWritability
@@ -144,10 +172,38 @@ paths:
description: Error text or 'true' if check passed.
example: "true"
/extensions:
post:
tags:
- Extensions
summary: Install extension if possible.
description: ""
operationId: installExtension
requestBody:
description: Extension name and database to install it to.
required: true
content:
application/json:
schema:
$ref: "#/components/schemas/ExtensionInstallRequest"
responses:
200:
description: Result from extension installation
content:
application/json:
schema:
$ref: "#/components/schemas/ExtensionInstallResult"
500:
description: Error during extension installation.
content:
application/json:
schema:
$ref: "#/components/schemas/GenericError"
/configure:
post:
tags:
- Configure
- Configure
summary: Perform compute node configuration.
description: |
This is a blocking API endpoint, i.e. it blocks waiting until
@@ -201,7 +257,7 @@ paths:
/extension_server:
post:
tags:
- Extension
- Extension
summary: Download extension from S3 to local folder.
description: ""
operationId: downloadExtension
@@ -230,7 +286,7 @@ paths:
/terminate:
post:
tags:
- Terminate
- Terminate
summary: Terminate Postgres and wait for it to exit
description: ""
operationId: terminate
@@ -369,7 +425,7 @@ components:
moment, when spec was received.
example: "2022-10-12T07:20:50.52Z"
status:
$ref: '#/components/schemas/ComputeStatus'
$ref: "#/components/schemas/ComputeStatus"
last_active:
type: string
description: |
@@ -409,6 +465,38 @@ components:
- configuration
example: running
ExtensionInstallRequest:
type: object
required:
- extension
- database
- version
properties:
extension:
type: string
description: Extension name.
example: "pg_session_jwt"
version:
type: string
description: Version of the extension.
example: "1.0.0"
database:
type: string
description: Database name.
example: "neondb"
ExtensionInstallResult:
type: object
properties:
extension:
description: Name of the extension.
type: string
example: "pg_session_jwt"
version:
description: Version of the extension.
type: string
example: "1.0.0"
InstalledExtensions:
type: object
properties:
@@ -427,6 +515,60 @@ components:
n_databases:
type: integer
SetRoleGrantsRequest:
type: object
required:
- database
- schema
- privileges
- role
properties:
database:
type: string
description: Database name.
example: "neondb"
schema:
type: string
description: Schema name.
example: "public"
privileges:
type: array
items:
type: string
description: List of privileges to set.
example: ["SELECT", "INSERT"]
role:
type: string
description: Role name.
example: "neon"
SetRoleGrantsResponse:
type: object
required:
- database
- schema
- privileges
- role
properties:
database:
type: string
description: Database name.
example: "neondb"
schema:
type: string
description: Schema name.
example: "public"
privileges:
type: array
items:
type: string
description: List of privileges set.
example: ["SELECT", "INSERT"]
role:
type: string
description: Role name.
example: "neon"
#
# Errors
#

View File

@@ -1,5 +1,6 @@
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
pub mod privilege;
pub mod requests;
pub mod responses;
pub mod spec;

View File

@@ -0,0 +1,35 @@
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "UPPERCASE")]
pub enum Privilege {
Select,
Insert,
Update,
Delete,
Truncate,
References,
Trigger,
Usage,
Create,
Connect,
Temporary,
Execute,
}
impl Privilege {
pub fn as_str(&self) -> &'static str {
match self {
Privilege::Select => "SELECT",
Privilege::Insert => "INSERT",
Privilege::Update => "UPDATE",
Privilege::Delete => "DELETE",
Privilege::Truncate => "TRUNCATE",
Privilege::References => "REFERENCES",
Privilege::Trigger => "TRIGGER",
Privilege::Usage => "USAGE",
Privilege::Create => "CREATE",
Privilege::Connect => "CONNECT",
Privilege::Temporary => "TEMPORARY",
Privilege::Execute => "EXECUTE",
}
}
}

View File

@@ -1,6 +1,6 @@
//! Structs representing the JSON formats used in the compute_ctl's HTTP API.
use crate::spec::ComputeSpec;
use crate::{privilege::Privilege, spec::ComputeSpec};
use serde::Deserialize;
/// Request of the /configure API
@@ -12,3 +12,18 @@ use serde::Deserialize;
pub struct ConfigurationRequest {
pub spec: ComputeSpec,
}
#[derive(Deserialize, Debug)]
pub struct ExtensionInstallRequest {
pub extension: String,
pub database: String,
pub version: String,
}
#[derive(Deserialize, Debug)]
pub struct SetRoleGrantsRequest {
pub database: String,
pub schema: String,
pub privileges: Vec<Privilege>,
pub role: String,
}

View File

@@ -6,7 +6,10 @@ use std::fmt::Display;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize, Serializer};
use crate::spec::{ComputeSpec, Database, Role};
use crate::{
privilege::Privilege,
spec::{ComputeSpec, Database, Role},
};
#[derive(Serialize, Debug, Deserialize)]
pub struct GenericAPIError {
@@ -168,3 +171,17 @@ pub struct InstalledExtension {
pub struct InstalledExtensions {
pub extensions: Vec<InstalledExtension>,
}
#[derive(Clone, Debug, Default, Serialize)]
pub struct ExtensionInstallResult {
pub extension: String,
pub version: String,
}
#[derive(Clone, Debug, Default, Serialize)]
pub struct SetRoleGrantsResponse {
pub database: String,
pub schema: String,
pub privileges: Vec<Privilege>,
pub role: String,
}

View File

@@ -1,23 +1,32 @@
use std::net::SocketAddr;
use arc_swap::ArcSwapOption;
use tokio::sync::Semaphore;
use super::jwt::{AuthRule, FetchAuthRules};
use crate::auth::backend::jwt::FetchAuthRulesError;
use crate::compute::ConnCfg;
use crate::compute_ctl::ComputeCtlApi;
use crate::context::RequestMonitoring;
use crate::control_plane::messages::{ColdStartInfo, EndpointJwksResponse, MetricsAuxInfo};
use crate::control_plane::NodeInfo;
use crate::intern::{BranchIdTag, EndpointIdTag, InternId, ProjectIdTag};
use crate::EndpointId;
use crate::url::ApiUrl;
use crate::{http, EndpointId};
pub struct LocalBackend {
pub(crate) initialize: Semaphore,
pub(crate) compute_ctl: ComputeCtlApi,
pub(crate) node_info: NodeInfo,
}
impl LocalBackend {
pub fn new(postgres_addr: SocketAddr) -> Self {
pub fn new(postgres_addr: SocketAddr, compute_ctl: ApiUrl) -> Self {
LocalBackend {
initialize: Semaphore::new(1),
compute_ctl: ComputeCtlApi {
api: http::Endpoint::new(compute_ctl, http::new_client()),
},
node_info: NodeInfo {
config: {
let mut cfg = ConnCfg::new();

View File

@@ -25,6 +25,7 @@ use proxy::rate_limiter::{
use proxy::scram::threadpool::ThreadPool;
use proxy::serverless::cancel_set::CancelSet;
use proxy::serverless::{self, GlobalConnPoolOptions};
use proxy::url::ApiUrl;
use proxy::RoleName;
project_git_version!(GIT_VERSION);
@@ -80,7 +81,10 @@ struct LocalProxyCliArgs {
connect_to_compute_retry: String,
/// Address of the postgres server
#[clap(long, default_value = "127.0.0.1:5432")]
compute: SocketAddr,
postgres: SocketAddr,
/// Address of the compute-ctl api service
#[clap(long, default_value = "http://127.0.0.1:3080/")]
compute_ctl: ApiUrl,
/// Path of the local proxy config file
#[clap(long, default_value = "./local_proxy.json")]
config_path: Utf8PathBuf,
@@ -295,7 +299,7 @@ fn build_auth_backend(
args: &LocalProxyCliArgs,
) -> anyhow::Result<&'static auth::Backend<'static, ()>> {
let auth_backend = proxy::auth::Backend::Local(proxy::auth::backend::MaybeOwned::Owned(
LocalBackend::new(args.compute),
LocalBackend::new(args.postgres, args.compute_ctl.clone()),
));
Ok(Box::leak(Box::new(auth_backend)))

View File

@@ -0,0 +1,101 @@
use compute_api::responses::GenericAPIError;
use hyper::{Method, StatusCode};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use crate::url::ApiUrl;
use crate::{http, DbName, RoleName};
pub struct ComputeCtlApi {
pub(crate) api: http::Endpoint,
}
#[derive(Serialize, Debug)]
pub struct ExtensionInstallRequest {
pub extension: &'static str,
pub database: DbName,
pub version: &'static str,
}
#[derive(Serialize, Debug)]
pub struct SetRoleGrantsRequest {
pub database: DbName,
pub schema: &'static str,
pub privileges: Vec<Privilege>,
pub role: RoleName,
}
#[derive(Clone, Debug, Deserialize)]
pub struct ExtensionInstallResponse {}
#[derive(Clone, Debug, Deserialize)]
pub struct SetRoleGrantsResponse {}
#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
#[serde(rename_all = "UPPERCASE")]
pub enum Privilege {
Usage,
}
#[derive(Error, Debug)]
pub enum ComputeCtlError {
#[error("connection error: {0}")]
ConnectionError(#[source] reqwest_middleware::Error),
#[error("request error [{status}]: {body:?}")]
RequestError {
status: StatusCode,
body: Option<GenericAPIError>,
},
#[error("response parsing error: {0}")]
ResonseError(#[source] reqwest::Error),
}
impl ComputeCtlApi {
pub async fn install_extension(
&self,
req: &ExtensionInstallRequest,
) -> Result<ExtensionInstallResponse, ComputeCtlError> {
self.generic_request(req, Method::POST, |url| {
url.path_segments_mut().push("extensions");
})
.await
}
pub async fn grant_role(
&self,
req: &SetRoleGrantsRequest,
) -> Result<SetRoleGrantsResponse, ComputeCtlError> {
self.generic_request(req, Method::POST, |url| {
url.path_segments_mut().push("grants");
})
.await
}
async fn generic_request<Req, Resp>(
&self,
req: &Req,
method: Method,
url: impl for<'a> FnOnce(&'a mut ApiUrl),
) -> Result<Resp, ComputeCtlError>
where
Req: Serialize,
Resp: DeserializeOwned,
{
let resp = self
.api
.request_with_url(method, url)
.json(req)
.send()
.await
.map_err(ComputeCtlError::ConnectionError)?;
let status = resp.status();
if status.is_client_error() || status.is_server_error() {
let body = resp.json().await.ok();
return Err(ComputeCtlError::RequestError { status, body });
}
resp.json().await.map_err(ComputeCtlError::ResonseError)
}
}

View File

@@ -8,6 +8,7 @@ use std::time::Duration;
use anyhow::bail;
use bytes::Bytes;
use http::Method;
use http_body_util::BodyExt;
use hyper::body::Body;
pub(crate) use reqwest::{Request, Response};
@@ -93,9 +94,19 @@ impl Endpoint {
/// Return a [builder](RequestBuilder) for a `GET` request,
/// accepting a closure to modify the url path segments for more complex paths queries.
pub(crate) fn get_with_url(&self, f: impl for<'a> FnOnce(&'a mut ApiUrl)) -> RequestBuilder {
self.request_with_url(Method::GET, f)
}
/// Return a [builder](RequestBuilder) for a request,
/// accepting a closure to modify the url path segments for more complex paths queries.
pub(crate) fn request_with_url(
&self,
method: Method,
f: impl for<'a> FnOnce(&'a mut ApiUrl),
) -> RequestBuilder {
let mut url = self.endpoint.clone();
f(&mut url);
self.client.get(url.into_inner())
self.client.request(method, url.into_inner())
}
/// Execute a [request](reqwest::Request).

View File

@@ -94,6 +94,7 @@ pub mod auth;
pub mod cache;
pub mod cancellation;
pub mod compute;
pub mod compute_ctl;
pub mod config;
pub mod console_redirect_proxy;
pub mod context;

View File

@@ -14,10 +14,13 @@ use tracing::{debug, info};
use super::conn_pool::poll_client;
use super::conn_pool_lib::{Client, ConnInfo, GlobalConnPool};
use super::http_conn_pool::{self, poll_http2_client, Send};
use super::local_conn_pool::{self, LocalClient, LocalConnPool};
use super::local_conn_pool::{self, LocalClient, LocalConnPool, EXT_NAME, EXT_SCHEMA, EXT_VERSION};
use crate::auth::backend::local::StaticAuthRules;
use crate::auth::backend::{ComputeCredentials, ComputeUserInfo};
use crate::auth::{self, check_peer_addr_is_in_list, AuthError};
use crate::compute_ctl::{
ComputeCtlError, ExtensionInstallRequest, Privilege, SetRoleGrantsRequest,
};
use crate::config::ProxyConfig;
use crate::context::RequestMonitoring;
use crate::control_plane::errors::{GetAuthInfoError, WakeComputeError};
@@ -35,6 +38,7 @@ pub(crate) struct PoolingBackend {
pub(crate) http_conn_pool: Arc<super::http_conn_pool::GlobalConnPool<Send>>,
pub(crate) local_pool: Arc<LocalConnPool<tokio_postgres::Client>>,
pub(crate) pool: Arc<GlobalConnPool<tokio_postgres::Client>>,
pub(crate) config: &'static ProxyConfig,
pub(crate) auth_backend: &'static crate::auth::Backend<'static, ()>,
pub(crate) endpoint_rate_limiter: Arc<EndpointRateLimiter>,
@@ -250,16 +254,50 @@ impl PoolingBackend {
return Ok(client);
}
let local_backend = match &self.auth_backend {
auth::Backend::ControlPlane(_, ()) => {
unreachable!("only local_proxy can connect to local postgres")
}
auth::Backend::Local(local) => local,
};
#[allow(unreachable_code, clippy::todo)]
if !self.local_pool.initialized(&conn_info) {
// only install and grant usage one at a time.
let _permit = local_backend.initialize.acquire().await.unwrap();
// check again for race
if !self.local_pool.initialized(&conn_info) {
local_backend
.compute_ctl
.install_extension(&ExtensionInstallRequest {
extension: EXT_NAME,
database: conn_info.dbname.clone(),
// todo: move to const or config
version: EXT_VERSION,
})
.await?;
local_backend
.compute_ctl
.grant_role(&SetRoleGrantsRequest {
// fixed for pg_session_jwt
schema: EXT_SCHEMA,
privileges: vec![Privilege::Usage],
database: conn_info.dbname.clone(),
role: conn_info.user_info.user.clone(),
})
.await?;
self.local_pool.set_initialized(&conn_info);
}
}
let conn_id = uuid::Uuid::new_v4();
tracing::Span::current().record("conn_id", display(conn_id));
info!(%conn_id, "local_pool: opening a new connection '{conn_info}'");
let mut node_info = match &self.auth_backend {
auth::Backend::ControlPlane(_, ()) => {
unreachable!("only local_proxy can connect to local postgres")
}
auth::Backend::Local(local) => local.node_info.clone(),
};
let mut node_info = local_backend.node_info.clone();
let (key, jwk) = create_random_jwk();
@@ -324,6 +362,8 @@ pub(crate) enum HttpConnError {
#[error("could not parse JWT payload")]
JwtPayloadError(serde_json::Error),
#[error("could not install extension: {0}")]
ComputeCtl(#[from] ComputeCtlError),
#[error("could not get auth info")]
GetAuthInfo(#[from] GetAuthInfoError),
#[error("user not authenticated")]
@@ -348,6 +388,7 @@ impl ReportableError for HttpConnError {
HttpConnError::ConnectionClosedAbruptly(_) => ErrorKind::Compute,
HttpConnError::PostgresConnectionError(p) => p.get_error_kind(),
HttpConnError::LocalProxyConnectionError(_) => ErrorKind::Compute,
HttpConnError::ComputeCtl(_) => ErrorKind::Service,
HttpConnError::JwtPayloadError(_) => ErrorKind::User,
HttpConnError::GetAuthInfo(a) => a.get_error_kind(),
HttpConnError::AuthError(a) => a.get_error_kind(),
@@ -363,6 +404,7 @@ impl UserFacingError for HttpConnError {
HttpConnError::ConnectionClosedAbruptly(_) => self.to_string(),
HttpConnError::PostgresConnectionError(p) => p.to_string(),
HttpConnError::LocalProxyConnectionError(p) => p.to_string(),
HttpConnError::ComputeCtl(_) => "could not set up the JWT authorization database extension".to_string(),
HttpConnError::JwtPayloadError(p) => p.to_string(),
HttpConnError::GetAuthInfo(c) => c.to_string_client(),
HttpConnError::AuthError(c) => c.to_string_client(),
@@ -379,6 +421,7 @@ impl CouldRetry for HttpConnError {
match self {
HttpConnError::PostgresConnectionError(e) => e.could_retry(),
HttpConnError::LocalProxyConnectionError(e) => e.could_retry(),
HttpConnError::ComputeCtl(_) => false,
HttpConnError::ConnectionClosedAbruptly(_) => false,
HttpConnError::JwtPayloadError(_) => false,
HttpConnError::GetAuthInfo(_) => false,

View File

@@ -1,3 +1,14 @@
//! Manages the pool of connections between local_proxy and postgres.
//!
//! The pool is keyed by database and role_name, and can contain multiple connections
//! shared between users.
//!
//! The pool manages the pg_session_jwt extension used for authorizing
//! requests in the db.
//!
//! The first time a db/role pair is seen, local_proxy attempts to install the extension
//! and grant usage to the role on the given schema.
use std::collections::HashMap;
use std::pin::pin;
use std::sync::{Arc, Weak};
@@ -28,14 +39,15 @@ use crate::usage_metrics::{Ids, MetricCounter, USAGE_METRICS};
use crate::{DbName, RoleName};
pub(crate) const EXT_NAME: &str = "pg_session_jwt";
pub(crate) const EXT_VERSION: &str = "0.1.1";
pub(crate) const EXT_SCHEMA: &str = "auth";
struct ConnPoolEntry<C: ClientInnerExt> {
conn: ClientInner<C>,
_last_access: std::time::Instant,
}
// /// key id for the pg_session_jwt state
// static PG_SESSION_JWT_KID: AtomicU64 = AtomicU64::new(1);
// Per-endpoint connection pool, (dbname, username) -> DbUserConnPool
// Number of open connections is limited by the `max_conns_per_endpoint`.
pub(crate) struct EndpointConnPool<C: ClientInnerExt> {
@@ -141,11 +153,18 @@ impl<C: ClientInnerExt> Drop for EndpointConnPool<C> {
pub(crate) struct DbUserConnPool<C: ClientInnerExt> {
conns: Vec<ConnPoolEntry<C>>,
// true if we have definitely installed the extension and
// granted the role access to the auth schema.
initialized: bool,
}
impl<C: ClientInnerExt> Default for DbUserConnPool<C> {
fn default() -> Self {
Self { conns: Vec::new() }
Self {
conns: Vec::new(),
initialized: false,
}
}
}
@@ -200,25 +219,16 @@ impl<C: ClientInnerExt> LocalConnPool<C> {
self.config.pool_options.idle_timeout
}
// pub(crate) fn shutdown(&self) {
// let mut pool = self.global_pool.write();
// pool.pools.clear();
// pool.total_conns = 0;
// }
pub(crate) fn get(
self: &Arc<Self>,
ctx: &RequestMonitoring,
conn_info: &ConnInfo,
) -> Result<Option<LocalClient<C>>, HttpConnError> {
let mut client: Option<ClientInner<C>> = None;
if let Some(entry) = self
let client = self
.global_pool
.write()
.get_conn_entry(conn_info.db_and_user())
{
client = Some(entry.conn);
}
.map(|entry| entry.conn);
// ok return cached connection if found and establish a new one otherwise
if let Some(client) = client {
@@ -246,6 +256,23 @@ impl<C: ClientInnerExt> LocalConnPool<C> {
}
Ok(None)
}
pub(crate) fn initialized(self: &Arc<Self>, conn_info: &ConnInfo) -> bool {
self.global_pool
.read()
.pools
.get(&conn_info.db_and_user())
.map_or(false, |pool| pool.initialized)
}
pub(crate) fn set_initialized(self: &Arc<Self>, conn_info: &ConnInfo) {
self.global_pool
.write()
.pools
.entry(conn_info.db_and_user())
.or_default()
.initialized = true;
}
}
#[allow(clippy::too_many_arguments)]